You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/06/03 18:03:14 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-789] Implement a FileSystem based SpecProducer.

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b3011c0  [GOBBLIN-789] Implement a FileSystem based SpecProducer.
b3011c0 is described below

commit b3011c060aae679489e3b6704a5027eceda01c87
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Jun 3 11:03:07 2019 -0700

    [GOBBLIN-789] Implement a FileSystem based SpecProducer.
    
    Closes #2655 from sv2000/fsSpecProducer
---
 .../FsScheduledJobConfigurationManagerTest.java    |  67 +++++-----
 .../suite/IntegrationJobRestartViaSpecSuite.java   |  81 ++++++------
 .../apache/gobblin/runtime/api/FsSpecProducer.java | 137 +++++++++++++++++++++
 .../gobblin/runtime/api/FsSpecProducerTest.java    |  93 ++++++++++++++
 4 files changed, 301 insertions(+), 77 deletions(-)

diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
index 09c0c86..109e058 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
@@ -17,16 +17,10 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,12 +39,13 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.FsSpecProducer;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
 
 @Slf4j
 public class FsScheduledJobConfigurationManagerTest {
@@ -62,6 +57,7 @@ public class FsScheduledJobConfigurationManagerTest {
   private String jobSpecUriString = "testJobSpec";
 
   private FileSystem fs;
+  private SpecProducer _specProducer;
 
   @BeforeClass
   public void setUp() throws IOException {
@@ -73,38 +69,43 @@ public class FsScheduledJobConfigurationManagerTest {
 
     EventBus eventBus = new EventBus(FsScheduledJobConfigurationManagerTest.class.getSimpleName());
     Config config = ConfigFactory.empty()
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir));
-    this._jobCatalog = new NonObservingFSJobCatalog(config);
-    ((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
-
-    Config jobConfigurationManagerConfig = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir))
         .withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
         .withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString));
-    jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus, jobConfigurationManagerConfig, this._jobCatalog);
-  }
-
-  private void addJobSpec(String jobSpecName, String version, String verb) throws IOException {
-    Map<String, String> metadataMap = new HashMap<>();
-    metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
 
-    AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
-        setUri(Files.getNameWithoutExtension(jobSpecName)).
-        setProperties(new HashMap<>()).
-        setTemplateUri("FS:///").
-        setDescription("test").
-        setVersion(version).
-        setMetadata(metadataMap).build();
+    this._jobCatalog = new NonObservingFSJobCatalog(config);
+    ((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
 
-    DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
-    DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+    jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus, config, this._jobCatalog);
 
-    Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString, jobSpecName);
-    FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
-    OutputStream out = fs.create(fsSpecConsumerPath);
+    _specProducer = new FsSpecProducer(config);
+  }
 
-    dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
-    dataFileWriter.append(jobSpec);
-    dataFileWriter.close();
+  private void addJobSpec(String jobSpecName, String version, String verb)
+      throws URISyntaxException, IOException {
+    JobSpec jobSpec =
+        JobSpec.builder(new URI(Files.getNameWithoutExtension(jobSpecName)))
+            .withConfig(ConfigFactory.empty())
+            .withTemplate(new URI("FS:///"))
+            .withVersion(version)
+            .withDescription("test")
+            .build();
+
+    SpecExecutor.Verb enumVerb = SpecExecutor.Verb.valueOf(verb);
+
+    switch (enumVerb) {
+      case ADD:
+        _specProducer.addSpec(jobSpec);
+        break;
+      case DELETE:
+        _specProducer.deleteSpec(jobSpec.getUri());
+        break;
+      case UPDATE:
+        _specProducer.updateSpec(jobSpec);
+        break;
+      default:
+        throw new IOException("Unknown Spec Verb: " + verb);
+    }
   }
 
   @Test (expectedExceptions = {JobSpecNotFoundException.class})
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index 1486e58..1d2c247 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -20,15 +20,10 @@ package org.apache.gobblin.cluster.suite;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.io.Reader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.net.URI;
+import java.net.URISyntaxException;
 
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -47,9 +42,10 @@ import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.SleepingTask;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.FsSpecProducer;
+import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.SpecProducer;
 
 
 public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite {
@@ -59,6 +55,8 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
   public static final String FS_SPEC_CONSUMER_DIR = "/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
   public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
 
+  private final SpecProducer _specProducer;
+
   public IntegrationJobRestartViaSpecSuite() throws IOException {
     super();
     Path jobCatalogDirPath = new Path(JOB_CATALOG_DIR);
@@ -66,9 +64,10 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
     if (!fs.exists(jobCatalogDirPath)) {
       fs.mkdirs(jobCatalogDirPath);
     }
+    this._specProducer = new FsSpecProducer(ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
   }
 
-  private Map<String,String> getJobConfig() throws IOException {
+  private Config getJobConfig() throws IOException {
     try (InputStream resourceStream = Resources.getResource(JOB_CONF_NAME).openStream()) {
       Reader reader = new InputStreamReader(resourceStream);
       Config rawJobConfig =
@@ -84,13 +83,7 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
 
       newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
       newConfig = newConfig.withFallback(rawJobConfig);
-
-      Properties jobProperties = ConfigUtils.configToProperties(newConfig);
-      Map<String, String> jobPropertiesAsMap = new HashMap<>();
-      for (String name : jobProperties.stringPropertyNames()) {
-        jobPropertiesAsMap.put(name, jobProperties.getProperty(name));
-      }
-      return jobPropertiesAsMap;
+      return newConfig;
     }
   }
 
@@ -107,37 +100,37 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite
     return managerConfig;
   }
 
-  public void addJobSpec(String jobSpecName, String verb) throws IOException {
-    Map<String, String> metadataMap = new HashMap<>();
-    metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
-
-    Map<String, String> jobProperties = new HashMap<>();
+  public void addJobSpec(String jobSpecName, String verb) throws IOException, URISyntaxException {
+    Config jobConfig = ConfigFactory.empty();
     if (SpecExecutor.Verb.ADD.name().equals(verb)) {
-      jobProperties = getJobConfig();
+      jobConfig = getJobConfig();
     } else if (SpecExecutor.Verb.DELETE.name().equals(verb)) {
-      jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+      jobConfig = jobConfig.withValue(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, ConfigValueFactory.fromAnyRef("true"));
     } else if (SpecExecutor.Verb.UPDATE.name().equals(verb)) {
-      jobProperties = getJobConfig();
-      jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+      jobConfig = getJobConfig().withValue(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, ConfigValueFactory.fromAnyRef("true"));
     }
 
-    AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
-        setUri(Files.getNameWithoutExtension(jobSpecName)).
-        setProperties(jobProperties).
-        setTemplateUri("FS:///").
-        setDescription("HelloWorldTestJob").
-        setVersion("1").
-        setMetadata(metadataMap).build();
-
-    DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
-    DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
-
-    Path fsSpecConsumerPath = new Path(FS_SPEC_CONSUMER_DIR, jobSpecName);
-    FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
-    OutputStream out = fs.create(fsSpecConsumerPath);
-
-    dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
-    dataFileWriter.append(jobSpec);
-    dataFileWriter.close();
+    JobSpec jobSpec = JobSpec.builder(Files.getNameWithoutExtension(jobSpecName))
+        .withConfig(jobConfig)
+        .withTemplate(new URI("FS:///"))
+        .withDescription("HelloWorldTestJob")
+        .withVersion("1")
+        .build();
+
+    SpecExecutor.Verb enumVerb = SpecExecutor.Verb.valueOf(verb);
+
+    switch (enumVerb) {
+      case ADD:
+        _specProducer.addSpec(jobSpec);
+        break;
+      case DELETE:
+        _specProducer.deleteSpec(jobSpec.getUri());
+        break;
+      case UPDATE:
+        _specProducer.updateSpec(jobSpec);
+        break;
+      default:
+        throw new IOException("Unknown Spec Verb: " + verb);
+    }
   }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
new file mode 100644
index 0000000..d55338e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An implementation of {@link SpecProducer} that produces {@link JobSpec}s to the {@value FsSpecConsumer#SPEC_PATH_KEY}
+ * for consumption by the {@link FsSpecConsumer}.
+ */
+@Slf4j
+public class FsSpecProducer implements SpecProducer<Spec> {
+  protected static final String VERB_KEY = "Verb";
+
+  private Path specConsumerPath;
+
+  public FsSpecProducer(Config config) {
+    String specConsumerDir = ConfigUtils.getString(config, FsSpecConsumer.SPEC_PATH_KEY, "");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(specConsumerDir), "Missing argument: " + FsSpecConsumer.SPEC_PATH_KEY);
+    this.specConsumerPath = new Path(specConsumerDir);
+  }
+
+  /** Add a {@link Spec} for execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+   * @param addedSpec*/
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    return writeSpec(addedSpec, SpecExecutor.Verb.ADD);
+  }
+
+  /** Update a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+   * @param updatedSpec*/
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    return writeSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+  }
+
+  private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
+    if (spec instanceof JobSpec) {
+      try {
+        AvroJobSpec avroJobSpec = convertToAvroJobSpec((JobSpec) spec, verb);
+        writeAvroJobSpec(avroJobSpec);
+        return new CompletedFuture<>(Boolean.TRUE, null);
+      } catch (IOException e) {
+        log.error("Exception encountered when adding Spec {}", spec);
+        return new CompletedFuture<>(Boolean.TRUE, e);
+      }
+    } else {
+      throw new RuntimeException("Unsupported spec type " + spec.getClass());
+    }
+  }
+
+  /** Delete a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+   * @param deletedSpecURI
+   * @param headers*/
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
+    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
+        .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name()))
+        .setProperties(Maps.fromProperties(headers)).build();
+    try {
+      writeAvroJobSpec(avroJobSpec);
+      return new CompletedFuture<>(Boolean.TRUE, null);
+    } catch (IOException e) {
+      log.error("Exception encountered when writing DELETE spec");
+      return new CompletedFuture<>(Boolean.TRUE, e);
+    }
+  }
+
+  /** List all {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    throw new UnsupportedOperationException();
+  }
+
+  private AvroJobSpec convertToAvroJobSpec(JobSpec jobSpec, SpecExecutor.Verb verb) {
+    return AvroJobSpec.newBuilder().
+        setUri(jobSpec.getUri().toString()).
+        setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties())).
+        setTemplateUri("FS:///").
+        setDescription(jobSpec.getDescription()).
+        setVersion(jobSpec.getVersion()).
+        setMetadata(ImmutableMap.of(VERB_KEY, verb.name())).build();
+  }
+
+  private void writeAvroJobSpec(AvroJobSpec jobSpec) throws IOException {
+    DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
+    DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+
+    Path jobSpecPath = new Path(this.specConsumerPath, jobSpec.getUri());
+    FileSystem fs = jobSpecPath.getFileSystem(new Configuration());
+    OutputStream out = fs.create(jobSpecPath);
+
+    dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
+    dataFileWriter.append(jobSpec);
+    dataFileWriter.close();
+  }
+
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
new file mode 100644
index 0000000..89c80fe
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+
+public class FsSpecProducerTest {
+  private FsSpecProducer _fsSpecProducer;
+  private FsSpecConsumer _fsSpecConsumer;
+
+  @BeforeMethod
+  public void setUp() {
+    File tmpDir = Files.createTempDir();
+    Config config = ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(
+        tmpDir.getAbsolutePath()));
+    this._fsSpecProducer = new FsSpecProducer(config);
+    this._fsSpecConsumer = new FsSpecConsumer(config);
+  }
+
+  private JobSpec createTestJobSpec() throws URISyntaxException {
+    JobSpec jobSpec = JobSpec.builder("testJob").withConfig(ConfigFactory.empty().
+        withValue("key1", ConfigValueFactory.fromAnyRef("val1")).
+        withValue("key2", ConfigValueFactory.fromAnyRef("val2"))).
+        withVersion("1").withDescription("").withTemplate(new URI("FS:///")).build();
+    return jobSpec;
+  }
+
+  @Test
+  public void testAddSpec()
+      throws URISyntaxException, ExecutionException, InterruptedException {
+    this._fsSpecProducer.addSpec(createTestJobSpec());
+
+    List<Pair<SpecExecutor.Verb, Spec>> jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+    Assert.assertEquals(jobSpecs.size(), 1);
+    Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.ADD);
+    Assert.assertEquals(jobSpecs.get(0).getRight().getUri().toString(), "testJob");
+    Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key1"), "val1");
+    Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key2"), "val2");
+  }
+
+  @Test (dependsOnMethods = "testAddSpec")
+  public void testUpdateSpec() throws ExecutionException, InterruptedException, URISyntaxException {
+    this._fsSpecProducer.updateSpec(createTestJobSpec());
+
+    List<Pair<SpecExecutor.Verb, Spec>> jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+    Assert.assertEquals(jobSpecs.size(), 1);
+    Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.UPDATE);
+    Assert.assertEquals(jobSpecs.get(0).getRight().getUri().toString(), "testJob");
+    Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key1"), "val1");
+    Assert.assertEquals(((JobSpec) jobSpecs.get(0).getRight()).getConfig().getString("key2"), "val2");
+  }
+
+  @Test (dependsOnMethods = "testUpdateSpec")
+  public void testDeleteSpec() throws URISyntaxException, ExecutionException, InterruptedException {
+    Properties headers = new Properties();
+    headers.put("headerProp1", "headerValue1");
+    this._fsSpecProducer.deleteSpec(new URI("testDeleteJob"), headers);
+    List<Pair<SpecExecutor.Verb, Spec>> jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+    Assert.assertEquals(jobSpecs.size(), 1);
+    Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.DELETE);
+    Assert.assertEquals(jobSpecs.get(0).getRight().getUri().toString(), "testDeleteJob");
+  }
+}
\ No newline at end of file