You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/06/03 18:03:14 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-789] Implement
a FileSystem based SpecProducer.
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b3011c0 [GOBBLIN-789] Implement a FileSystem based SpecProducer.
b3011c0 is described below
commit b3011c060aae679489e3b6704a5027eceda01c87
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Jun 3 11:03:07 2019 -0700
[GOBBLIN-789] Implement a FileSystem based SpecProducer.
Closes #2655 from sv2000/fsSpecProducer
---
.../FsScheduledJobConfigurationManagerTest.java | 67 +++++-----
.../suite/IntegrationJobRestartViaSpecSuite.java | 81 ++++++------
.../apache/gobblin/runtime/api/FsSpecProducer.java | 137 +++++++++++++++++++++
.../gobblin/runtime/api/FsSpecProducerTest.java | 93 ++++++++++++++
4 files changed, 301 insertions(+), 77 deletions(-)
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
index 09c0c86..109e058 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
@@ -17,16 +17,10 @@
package org.apache.gobblin.cluster;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,12 +39,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.FsSpecProducer;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
@Slf4j
public class FsScheduledJobConfigurationManagerTest {
@@ -62,6 +57,7 @@ public class FsScheduledJobConfigurationManagerTest {
private String jobSpecUriString = "testJobSpec";
private FileSystem fs;
+ private SpecProducer _specProducer;
@BeforeClass
public void setUp() throws IOException {
@@ -73,38 +69,43 @@ public class FsScheduledJobConfigurationManagerTest {
EventBus eventBus = new EventBus(FsScheduledJobConfigurationManagerTest.class.getSimpleName());
Config config = ConfigFactory.empty()
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir));
- this._jobCatalog = new NonObservingFSJobCatalog(config);
- ((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
-
- Config jobConfigurationManagerConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir))
.withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
.withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString));
- jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus, jobConfigurationManagerConfig, this._jobCatalog);
- }
-
- private void addJobSpec(String jobSpecName, String version, String verb) throws IOException {
- Map<String, String> metadataMap = new HashMap<>();
- metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
- AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
- setUri(Files.getNameWithoutExtension(jobSpecName)).
- setProperties(new HashMap<>()).
- setTemplateUri("FS:///").
- setDescription("test").
- setVersion(version).
- setMetadata(metadataMap).build();
+ this._jobCatalog = new NonObservingFSJobCatalog(config);
+ ((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
- DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
- DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+ jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus, config, this._jobCatalog);
- Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString, jobSpecName);
- FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
- OutputStream out = fs.create(fsSpecConsumerPath);
+ _specProducer = new FsSpecProducer(config);
+ }
- dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
- dataFileWriter.append(jobSpec);
- dataFileWriter.close();
+ private void addJobSpec(String jobSpecName, String version, String verb)
+ throws URISyntaxException, IOException {
+ JobSpec jobSpec =
+ JobSpec.builder(new URI(Files.getNameWithoutExtension(jobSpecName)))
+ .withConfig(ConfigFactory.empty())
+ .withTemplate(new URI("FS:///"))
+ .withVersion(version)
+ .withDescription("test")
+ .build();
+
+ SpecExecutor.Verb enumVerb = SpecExecutor.Verb.valueOf(verb);
+
+ switch (enumVerb) {
+ case ADD:
+ _specProducer.addSpec(jobSpec);
+ break;
+ case DELETE:
+ _specProducer.deleteSpec(jobSpec.getUri());
+ break;
+ case UPDATE:
+ _specProducer.updateSpec(jobSpec);
+ break;
+ default:
+ throw new IOException("Unknown Spec Verb: " + verb);
+ }
}
@Test (expectedExceptions = {JobSpecNotFoundException.class})
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index 1486e58..1d2c247 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -20,15 +20,10 @@ package org.apache.gobblin.cluster.suite;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStream;
import java.io.Reader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.net.URI;
+import java.net.URISyntaxException;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,9 +42,10 @@ import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.SleepingTask;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.FsSpecProducer;
+import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.SpecProducer;
public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite {
@@ -59,6 +55,8 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
public static final String FS_SPEC_CONSUMER_DIR = "/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
+ private final SpecProducer _specProducer;
+
public IntegrationJobRestartViaSpecSuite() throws IOException {
super();
Path jobCatalogDirPath = new Path(JOB_CATALOG_DIR);
@@ -66,9 +64,10 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
if (!fs.exists(jobCatalogDirPath)) {
fs.mkdirs(jobCatalogDirPath);
}
+ this._specProducer = new FsSpecProducer(ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
}
- private Map<String,String> getJobConfig() throws IOException {
+ private Config getJobConfig() throws IOException {
try (InputStream resourceStream = Resources.getResource(JOB_CONF_NAME).openStream()) {
Reader reader = new InputStreamReader(resourceStream);
Config rawJobConfig =
@@ -84,13 +83,7 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
newConfig = newConfig.withFallback(rawJobConfig);
-
- Properties jobProperties = ConfigUtils.configToProperties(newConfig);
- Map<String, String> jobPropertiesAsMap = new HashMap<>();
- for (String name : jobProperties.stringPropertyNames()) {
- jobPropertiesAsMap.put(name, jobProperties.getProperty(name));
- }
- return jobPropertiesAsMap;
+ return newConfig;
}
}
@@ -107,37 +100,37 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
return managerConfig;
}
- public void addJobSpec(String jobSpecName, String verb) throws IOException {
- Map<String, String> metadataMap = new HashMap<>();
- metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
-
- Map<String, String> jobProperties = new HashMap<>();
+ public void addJobSpec(String jobSpecName, String verb) throws IOException, URISyntaxException {
+ Config jobConfig = ConfigFactory.empty();
if (SpecExecutor.Verb.ADD.name().equals(verb)) {
- jobProperties = getJobConfig();
+ jobConfig = getJobConfig();
} else if (SpecExecutor.Verb.DELETE.name().equals(verb)) {
- jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+ jobConfig = jobConfig.withValue(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, ConfigValueFactory.fromAnyRef("true"));
} else if (SpecExecutor.Verb.UPDATE.name().equals(verb)) {
- jobProperties = getJobConfig();
- jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+ jobConfig = getJobConfig().withValue(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, ConfigValueFactory.fromAnyRef("true"));
}
- AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
- setUri(Files.getNameWithoutExtension(jobSpecName)).
- setProperties(jobProperties).
- setTemplateUri("FS:///").
- setDescription("HelloWorldTestJob").
- setVersion("1").
- setMetadata(metadataMap).build();
-
- DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
- DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
-
- Path fsSpecConsumerPath = new Path(FS_SPEC_CONSUMER_DIR, jobSpecName);
- FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
- OutputStream out = fs.create(fsSpecConsumerPath);
-
- dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
- dataFileWriter.append(jobSpec);
- dataFileWriter.close();
+ JobSpec jobSpec = JobSpec.builder(Files.getNameWithoutExtension(jobSpecName))
+ .withConfig(jobConfig)
+ .withTemplate(new URI("FS:///"))
+ .withDescription("HelloWorldTestJob")
+ .withVersion("1")
+ .build();
+
+ SpecExecutor.Verb enumVerb = SpecExecutor.Verb.valueOf(verb);
+
+ switch (enumVerb) {
+ case ADD:
+ _specProducer.addSpec(jobSpec);
+ break;
+ case DELETE:
+ _specProducer.deleteSpec(jobSpec.getUri());
+ break;
+ case UPDATE:
+ _specProducer.updateSpec(jobSpec);
+ break;
+ default:
+ throw new IOException("Unknown Spec Verb: " + verb);
+ }
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
new file mode 100644
index 0000000..d55338e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An implementation of {@link SpecProducer} that produces {@link JobSpec}s to the {@value FsSpecConsumer#SPEC_PATH_KEY}
+ * for consumption by the {@link FsSpecConsumer}.
+ */
+@Slf4j
+public class FsSpecProducer implements SpecProducer<Spec> {
+ protected static final String VERB_KEY = "Verb";
+
+ private Path specConsumerPath;
+
+ public FsSpecProducer(Config config) {
+ String specConsumerDir = ConfigUtils.getString(config, FsSpecConsumer.SPEC_PATH_KEY, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(specConsumerDir), "Missing argument: " + FsSpecConsumer.SPEC_PATH_KEY);
+ this.specConsumerPath = new Path(specConsumerDir);
+ }
+
+ /** Add a {@link Spec} for execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+ * @param addedSpec*/
+ @Override
+ public Future<?> addSpec(Spec addedSpec) {
+ return writeSpec(addedSpec, SpecExecutor.Verb.ADD);
+ }
+
+ /** Update a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+ * @param updatedSpec*/
+ @Override
+ public Future<?> updateSpec(Spec updatedSpec) {
+ return writeSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+ }
+
+ private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
+ if (spec instanceof JobSpec) {
+ try {
+ AvroJobSpec avroJobSpec = convertToAvroJobSpec((JobSpec) spec, verb);
+ writeAvroJobSpec(avroJobSpec);
+ return new CompletedFuture<>(Boolean.TRUE, null);
+ } catch (IOException e) {
+ log.error("Exception encountered when adding Spec {}", spec);
+ return new CompletedFuture<>(Boolean.TRUE, e);
+ }
+ } else {
+ throw new RuntimeException("Unsupported spec type " + spec.getClass());
+ }
+ }
+
+ /** Delete a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+ * @param deletedSpecURI
+ * @param headers*/
+ @Override
+ public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
+ AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
+ .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name()))
+ .setProperties(Maps.fromProperties(headers)).build();
+ try {
+ writeAvroJobSpec(avroJobSpec);
+ return new CompletedFuture<>(Boolean.TRUE, null);
+ } catch (IOException e) {
+ log.error("Exception encountered when writing DELETE spec");
+ return new CompletedFuture<>(Boolean.TRUE, e);
+ }
+ }
+
+ /** List all {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */
+ @Override
+ public Future<? extends List<Spec>> listSpecs() {
+ throw new UnsupportedOperationException();
+ }
+
+ private AvroJobSpec convertToAvroJobSpec(JobSpec jobSpec, SpecExecutor.Verb verb) {
+ return AvroJobSpec.newBuilder().
+ setUri(jobSpec.getUri().toString()).
+ setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties())).
+ setTemplateUri("FS:///").
+ setDescription(jobSpec.getDescription()).
+ setVersion(jobSpec.getVersion()).
+ setMetadata(ImmutableMap.of(VERB_KEY, verb.name())).build();
+ }
+
+ private void writeAvroJobSpec(AvroJobSpec jobSpec) throws IOException {
+ DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
+ DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+
+ Path jobSpecPath = new Path(this.specConsumerPath, jobSpec.getUri());
+ FileSystem fs = jobSpecPath.getFileSystem(new Configuration());
+ OutputStream out = fs.create(jobSpecPath);
+
+ dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
+ dataFileWriter.append(jobSpec);
+ dataFileWriter.close();
+ }
+
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
new file mode 100644
index 0000000..89c80fe
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+
+public class FsSpecProducerTest {
+ private FsSpecProducer _fsSpecProducer;
+ private FsSpecConsumer _fsSpecConsumer;
+
+ @BeforeMethod
+ public void setUp() {
+ File tmpDir = Files.createTempDir();
+ Config config = ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(
+ tmpDir.getAbsolutePath()));
+ this._fsSpecProducer = new FsSpecProducer(config);
+ this._fsSpecConsumer = new FsSpecConsumer(config);
+ }
+
+ private JobSpec createTestJobSpec() throws URISyntaxException {
+ JobSpec jobSpec = JobSpec.builder("testJob").withConfig(ConfigFactory.empty().
+ withValue("key1", ConfigValueFactory.fromAnyRef("val1")).
+ withValue("key2", ConfigValueFactory.fromAnyRef("val2"))).
+ withVersion("1").withDescription("").withTemplate(new URI("FS:///")).build();
+ return jobSpec;
+ }
+
+ @Test
+ public void testAddSpec()
+ throws URISyntaxException, ExecutionException, InterruptedException {
+ this._fsSpecProducer.addSpec(createTestJobSpec());
+
+ List<Pair<SpecExecutor.Verb, Spec>> jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+ Assert.assertEquals(jobSpecs.size(), 1);
+ Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.ADD);
+ Assert.assertEquals(jobSpecs.get(0).getRight().getUri().toString(), "testJob");
+ Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key1"), "val1");
+ Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key2"), "val2");
+ }
+
+ @Test (dependsOnMethods = "testAddSpec")
+ public void testUpdateSpec() throws ExecutionException, InterruptedException, URISyntaxException {
+ this._fsSpecProducer.updateSpec(createTestJobSpec());
+
+ List<Pair<SpecExecutor.Verb, Spec>> jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+ Assert.assertEquals(jobSpecs.size(), 1);
+ Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.UPDATE);
+ Assert.assertEquals(jobSpecs.get(0).getRight().getUri().toString(), "testJob");
+ Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key1"), "val1");
+ Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key2"), "val2");
+ }
+
+ @Test (dependsOnMethods = "testUpdateSpec")
+ public void testDeleteSpec() throws URISyntaxException, ExecutionException, InterruptedException {
+ Properties headers = new Properties();
+ headers.put("headerProp1", "headerValue1");
+ this._fsSpecProducer.deleteSpec(new URI("testDeleteJob"), headers);
+ List<Pair<SpecExecutor.Verb, Spec>> jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+ Assert.assertEquals(jobSpecs.size(), 1);
+ Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.DELETE);
+ Assert.assertEquals(jobSpecs.get(0).getRight().getUri().toString(), "testDeleteJob");
+ }
+}
\ No newline at end of file