You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/07 22:05:09 UTC

incubator-gobblin git commit: [GOBBLIN-304] Change default version of flow specs to null.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 0e5561519 -> c385f1ddd


[GOBBLIN-304] Change default version of flow specs to null.

Closes #2159 from arjun4084346/removeSpecVersion


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c385f1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c385f1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c385f1dd

Branch: refs/heads/master
Commit: c385f1dddb0124f0ff2545d9a591f3b055f76080
Parents: 0e55615
Author: Arjun <ab...@linkedin.com>
Authored: Tue Nov 7 14:04:53 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Nov 7 14:04:53 2017 -0800

----------------------------------------------------------------------
 .../apache/gobblin/service/FlowConfigTest.java  | 12 +--
 .../gobblin/service/FlowConfigsResource.java    | 40 +--------
 .../apache/gobblin/runtime/api/FlowSpec.java    |  7 +-
 .../runtime/spec_catalog/FlowCatalog.java       | 17 ++--
 .../runtime/spec_catalog/TopologyCatalog.java   | 15 +---
 .../gobblin/runtime/spec_store/FSSpecStore.java | 92 +++++---------------
 .../gobblin/spec_catalog/FlowCatalogTest.java   |  7 +-
 .../spec_catalog/TopologyCatalogTest.java       |  3 +-
 .../service/modules/core/GitConfigMonitor.java  |  7 +-
 .../modules/core/GobblinServiceHATest.java      | 12 +--
 .../modules/core/GobblinServiceManagerTest.java | 25 ++----
 .../core/IdentityFlowToJobSpecCompilerTest.java |  4 +-
 .../MultiHopsFlowToJobSpecCompilerTest.java     |  4 +-
 .../modules/orchestration/OrchestratorTest.java |  2 +-
 14 files changed, 70 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 9ac7d37..a373762 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -171,11 +171,8 @@ public class FlowConfigTest {
     try {
       _client.createFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.S_409_CONFLICT.getCode());
-      return;
+      Assert.fail("Create Again should pass without complaining that the spec already exists.");
     }
-
-    Assert.fail("Get should have gotten a 409 error");
   }
 
   @Test (dependsOnMethods = "testCreateAgain")
@@ -187,7 +184,7 @@ public class FlowConfigTest {
     Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
     Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
     Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
-    Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
+    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
     // Add this asssert back when getFlowSpec() is changed to return the raw flow spec
     //Assert.assertEquals(flowConfig.getProperties().size(), 1);
     Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
@@ -282,11 +279,8 @@ public class FlowConfigTest {
     try {
       _client.updateFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND.getCode());
-      return;
+      Assert.fail("Bad update should pass without complaining that the spec does not exists.");
     }
-
-    Assert.fail("Get should have raised a 404 error");
   }
 
   @AfterClass(alwaysRun = true)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index a99087c..f0bce17 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -123,7 +123,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
         flowConfig.setSchedule(schedule);
       }
 
-
       // remove keys that were injected as part of flowSpec creation
       flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
       flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
@@ -180,26 +179,13 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
   public CreateResponse create(FlowConfig flowConfig) {
     LOG.info("Create called with flowName " + flowConfig.getId().getFlowName());
 
-    LOG.info("ReadyToUse is: " + readyToUse);
-    LOG.info("FlowCatalog is: " + getFlowCatalog());
+    LOG.debug("ReadyToUse is: " + readyToUse);
+    LOG.debug("FlowCatalog is: " + getFlowCatalog());
 
     if (!readyToUse && getFlowCatalog() == null) {
       throw new RuntimeException("Not ready for use.");
     }
 
-    try {
-      URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
-      URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
-          "/" + flowConfig.getId().getFlowGroup() + "/" + flowConfig.getId().getFlowName(), null, null);
-
-      if (getFlowCatalog().exists(flowUri)) {
-        logAndThrowRestLiServiceException(HttpStatus.S_409_CONFLICT,
-            "Flow with the same name already exists: " + flowUri, null);
-      }
-    } catch (URISyntaxException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getId().getFlowName(), e);
-    }
-
     getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
 
     return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED);
@@ -216,7 +202,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
   public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, FlowConfig flowConfig) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
-    URI flowUri = null;
 
     LOG.info("Update called with flowGroup " + flowGroup + " flowName " + flowName);
 
@@ -225,25 +210,9 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
           "flowName and flowGroup cannot be changed in update", null);
     }
 
-    try {
-      URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
-      flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
-          "/" + flowGroup + "/" + flowName, null, null);
-      if (!getFlowCatalog().exists(flowUri)) {
-        logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND,
-            "Flow does not exist: flowGroup " + flowGroup + " flowName " + flowName, null);
-      }
-
-      FlowSpec newFlowSpec = createFlowSpecForConfig(flowConfig);
-
-      getFlowCatalog().put(newFlowSpec);
+      getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
 
       return new UpdateResponse(HttpStatus.S_200_OK);
-    } catch (URISyntaxException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e);
-    }
-
-    return null;
   }
 
   /**
@@ -270,9 +239,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
       return new UpdateResponse(HttpStatus.S_200_OK);
     } catch (URISyntaxException e) {
       logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e);
-    } catch (SpecNotFoundException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow does not exist: flowGroup " + flowGroup +
-          " flowName " + flowName, null);
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index e7c08d2..59a5025 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -126,16 +126,17 @@ public class FlowSpec implements Configurable, Spec {
    *  <li> Default flowCatalogURI is {@link #DEFAULT_FLOW_CATALOG_SCHEME}:
    *  <li> Convention for FlowSpec URI: <flowCatalogURI>/config.get({@link ConfigurationKeys#FLOW_GROUP_KEY})/config.get({@link ConfigurationKeys#FLOW_NAME_KEY})
    *  <li> Convention for Description: config.get({@link ConfigurationKeys#FLOW_DESCRIPTION_KEY})
-   *  <li> Default version: 1
+   *  <li> Default version: empty
    * </ul>
    */
   public static class Builder {
     public static final String DEFAULT_FLOW_CATALOG_SCHEME = "gobblin-flow";
+    public static final String DEFAULT_VERSION = "";
     @VisibleForTesting
     private Optional<Config> config = Optional.absent();
     private Optional<Properties> configAsProperties = Optional.absent();
     private Optional<URI> uri;
-    private String version = "1";
+    private String version = FlowSpec.Builder.DEFAULT_VERSION;
     private Optional<String> description = Optional.absent();
     private Optional<URI> flowCatalogURI = Optional.absent();
     private Optional<Set<URI>> templateURIs = Optional.absent();
@@ -163,7 +164,7 @@ public class FlowSpec implements Configurable, Spec {
 
     public FlowSpec build() {
       Preconditions.checkNotNull(this.uri);
-      Preconditions.checkNotNull(this.version);
+      Preconditions.checkArgument(null != version, "Version should not be null");
 
       return new FlowSpec(getURI(), getVersion(), getDescription(), getConfig(),
           getConfigAsProperties(), getTemplateURIs(), getChildSpecs());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 1cb09da..ecfe036 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -234,29 +234,22 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
 
       log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
           ((FlowSpec) spec).getConfigAsProperties()));
-      if (specStore.exists(spec.getUri())) {
-        specStore.updateSpec(spec);
-        this.listeners.onUpdateSpec(spec);
-      } else {
-        specStore.addSpec(spec);
-        this.listeners.onAddSpec(spec);
-      }
-
-    } catch (IOException | SpecNotFoundException e) {
+      specStore.addSpec(spec);
+      this.listeners.onAddSpec(spec);
+    } catch (IOException e) {
       throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
     }
   }
 
   @Override
-  public void remove(URI uri) throws SpecNotFoundException {
+  public void remove(URI uri) {
     try {
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
       Preconditions.checkNotNull(uri);
 
       log.info(String.format("Removing FlowSpec with URI: %s", uri));
-      Spec spec = specStore.getSpec(uri);
-      this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion());
       specStore.deleteSpec(uri);
+      this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
 
     } catch (IOException e) {
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index c6e02d2..7bb8b9c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -30,6 +30,7 @@ import lombok.Getter;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -232,15 +233,9 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
 
       log.info(String.format("Adding TopologySpec with URI: %s and Config: %s", spec.getUri(),
           ((TopologySpec) spec).getConfigAsProperties()));
-      if (specStore.exists(spec.getUri())) {
-        specStore.updateSpec(spec);
-        this.listeners.onUpdateSpec(spec);
-      } else {
         specStore.addSpec(spec);
         this.listeners.onAddSpec(spec);
-      }
-
-    } catch (IOException | SpecNotFoundException e) {
+    } catch (IOException e) {
       throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
     }
   }
@@ -252,11 +247,9 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
       Preconditions.checkNotNull(uri);
 
       log.info(String.format("Removing TopologySpec with URI: %s", uri));
-      Spec spec = specStore.getSpec(uri);
-      this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion());
+      this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
       specStore.deleteSpec(uri);
-
-    } catch (IOException | SpecNotFoundException e) {
+    } catch (IOException e) {
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index b283c87..d87d6d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -25,6 +25,7 @@ import java.net.URI;
 import java.util.Collection;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -114,62 +115,36 @@ public class FSSpecStore implements SpecStore {
     return Files.getNameWithoutExtension(specUri.getName());
   }
 
-  private Collection<Spec> getAllVersionsOfSpec(String specGroup, String specName) throws IOException {
+  private Collection<Spec> getAllVersionsOfSpec(Path spec) {
     Collection<Spec> specs = Lists.newArrayList();
-    FileStatus[] fileStatuses;
-    try {
-      fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
-    } catch (FileNotFoundException e) {
-      return specs;
-    }
 
-    for (FileStatus fileStatus : fileStatuses) {
-      if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
-        specs.add(readSpecFromFile(fileStatus.getPath()));
-      }
+    try {
+      specs.add(readSpecFromFile(spec));
+    } catch (IOException e) {
+      log.warn("Spec {} not found.", spec);
     }
     return specs;
   }
 
+  /**
+   * Returns all versions of the spec defined by specUri.
+   * Currently, multiple versions are not supported, so this should return exactly one spec.
+   * @param specUri URI for the {@link Spec} to be retrieved.
+   * @return all versions of the spec.
+   */
   @Override
-  public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException {
+  public Collection<Spec> getAllVersionsOfSpec(URI specUri) {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
-    Path specPath = new Path(specUri.getPath());
-    return getAllVersionsOfSpec(getSpecGroup(specPath), getSpecName(specPath));
+    Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION);
+    return getAllVersionsOfSpec(specPath);
   }
 
   @Override
   public boolean exists(URI specUri) throws IOException {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
-    Path flowPath = new Path(specUri.getPath());
-    String specGroup = getSpecGroup(flowPath);
-    String specName = getSpecName(flowPath);
-    FileStatus[] fileStatuses;
-    try {
-      fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
-    } catch (FileNotFoundException e) {
-      return false;
-    }
 
-    // TODO Fix ETL-6496
-    // We need to revisit having a version delimiter.
-    // Currently without a delimiter the prefix check may match other specs that should not be matched.
-    for (FileStatus fileStatus : fileStatuses) {
-      if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private FileStatus[] listSpecs(Path fsSpecStoreDirPath, String specGroup) throws FileNotFoundException, IOException {
-    FileStatus[] fileStatuses;
-    if (StringUtils.isEmpty(specGroup)) {
-      fileStatuses = fs.listStatus(fsSpecStoreDirPath);
-    } else {
-      fileStatuses = fs.listStatus(new Path(fsSpecStoreDirPath, specGroup));
-    }
-    return fileStatuses;
+    Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION);
+    return fs.exists(specPath);
   }
 
   @Override
@@ -192,11 +167,7 @@ public class FSSpecStore implements SpecStore {
   public boolean deleteSpec(URI specUri) throws IOException {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
 
-    try {
-      return deleteSpec(specUri, getSpec(specUri).getVersion());
-    } catch (SpecNotFoundException e) {
-      throw new IOException(String.format("Issue in removing Spec: %s", specUri), e);
-    }
+      return deleteSpec(specUri, FlowSpec.Builder.DEFAULT_VERSION);
   }
 
   @Override
@@ -207,13 +178,7 @@ public class FSSpecStore implements SpecStore {
     try {
       log.info(String.format("Deleting Spec with URI: %s in FSSpecStore: %s", specUri, this.fsSpecStoreDirPath));
       Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, version);
-
-      if (fs.exists(specPath)) {
-        return fs.delete(specPath, false);
-      } else {
-        log.warn("No file with URI:" + specUri + " is found. Deletion failed.");
-        return false;
-      }
+      return fs.delete(specPath, false);
     } catch (IOException e) {
       throw new IOException(String.format("Issue in removing Spec: %s for Version: %s", specUri, version), e);
     }
@@ -221,17 +186,12 @@ public class FSSpecStore implements SpecStore {
 
   @Override
   public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
-    Preconditions.checkArgument(null != spec, "Spec should not be null");
-
-    log.info(String.format("Updating Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath));
-    Path specPath = getPathForURI(this.fsSpecStoreDirPath, spec.getUri(), spec.getVersion());
-    writeSpecToFile(specPath, spec);
-
+    addSpec(spec);
     return spec;
   }
 
   @Override
-  public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+  public Spec getSpec(URI specUri) throws SpecNotFoundException {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
 
     Collection<Spec> specs = getAllVersionsOfSpec(specUri);
@@ -296,9 +256,9 @@ public class FSSpecStore implements SpecStore {
    * @throws IOException
    */
   protected Spec readSpecFromFile(Path path) throws IOException {
-    Spec spec = null;
+    Spec spec;
 
-    try (FSDataInputStream fis = fs.open(path);) {
+    try (FSDataInputStream fis = fs.open(path)) {
       spec = this.specSerDe.deserialize(ByteStreams.toByteArray(fis));
     }
 
@@ -312,12 +272,8 @@ public class FSSpecStore implements SpecStore {
    * @throws IOException
    */
   protected void writeSpecToFile(Path specPath, Spec spec) throws IOException {
-    if (fs.exists(specPath)) {
-      fs.delete(specPath, true);
-    }
-
     byte[] serializedSpec = this.specSerDe.serialize(spec);
-    try (FSDataOutputStream os = fs.create(specPath)) {
+    try (FSDataOutputStream os = fs.create(specPath, true)) {
       os.write(serializedSpec);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
index ae2e087..537cdbe 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
@@ -56,7 +56,7 @@ public class FlowCatalogTest {
   private static final String SPEC_STORE_DIR = "/tmp/flowTestSpecStore";
   private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir";
   private static final String SPEC_DESCRIPTION = "Test Flow Spec";
-  private static final String SPEC_VERSION = "1";
+  private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
 
   private ServiceBasedAppLauncher serviceLauncher;
   private FlowCatalog flowCatalog;
@@ -144,6 +144,11 @@ public class FlowCatalogTest {
   }
 
   @Test (dependsOnMethods = "createFlowSpec")
+  void testExist() throws Exception {
+    Assert.assertTrue(flowCatalog.exists(flowSpec.getUri()));
+  }
+
+  @Test (dependsOnMethods = "testExist")
   public void deleteFlowSpec() throws SpecNotFoundException {
     // List Current Specs
     Collection<Spec> specs = flowCatalog.getSpecs();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
index 48fba40..a3490ff 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class TopologyCatalogTest {
   private static final String SPEC_STORE_PARENT_DIR = "/tmp";
   private static final String SPEC_STORE_DIR = "/tmp/topologyTestSpecStore";
   private static final String SPEC_DESCRIPTION = "Test Topology Spec";
-  private static final String SPEC_VERSION = "1";
+  private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
 
   private ServiceBasedAppLauncher serviceLauncher;
   private TopologyCatalog topologyCatalog;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 00f8fc2..b20e3b7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -72,7 +72,7 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class GitConfigMonitor extends AbstractIdleService {
   private static final String SPEC_DESCRIPTION = "Git-based flow config";
-  private static final String SPEC_VERSION = "1";
+  private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
   private static final int TERMINATION_TIMEOUT = 30;
   private static final int CONFIG_FILE_DEPTH = 3;
   private static final String REMOTE_NAME = "origin";
@@ -251,12 +251,7 @@ public class GitConfigMonitor extends AbstractIdleService {
           .withDescription(SPEC_DESCRIPTION)
           .build();
 
-      try {
         this.flowCatalog.remove(spec.getUri());
-      } catch (SpecNotFoundException e) {
-        // okay if flow does not exist
-        log.warn("Flow {} does not exist.", spec.getUri());
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 289e212..ad4180a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -301,16 +301,14 @@ public class GobblinServiceHATest {
     // Try create on both nodes
     try {
       this.node1FlowConfigClient.createFlowConfig(flowConfig1);
-      Assert.fail("Get should have gotten a 409 error");
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409);
+      Assert.fail("Create Again should pass without complaining that the spec already exists.");
     }
 
     try {
       this.node2FlowConfigClient.createFlowConfig(flowConfig2);
-      Assert.fail("Get should have gotten a 409 error");
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409);
+      Assert.fail("Create Again should pass without complaining that the spec already exists.");
     }
   }
 
@@ -442,16 +440,14 @@ public class GobblinServiceHATest {
 
     try {
       this.node1FlowConfigClient.updateFlowConfig(flowConfig);
-      Assert.fail("Get should have raised a 404 error");
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
+      Assert.fail("Bad update should pass without complaining that the spec does not exists.");
     }
 
     try {
       this.node2FlowConfigClient.updateFlowConfig(flowConfig);
-      Assert.fail("Get should have raised a 404 error");
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
+      Assert.fail("Bad update should pass without complaining that the spec does not exists.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index b40792e..926dd10 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -66,15 +66,14 @@ public class GobblinServiceManagerTest {
   private static final String SERVICE_WORK_DIR = "/tmp/serviceWorkDir/";
   private static final String SPEC_STORE_PARENT_DIR = "/tmp/serviceCore/";
   private static final String SPEC_DESCRIPTION = "Test ServiceCore";
-  private static final String SPEC_VERSION = "1";
   private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/serviceCore/topologyTestSpecStore";
   private static final String FLOW_SPEC_STORE_DIR = "/tmp/serviceCore/flowTestSpecStore";
   private static final String GIT_CLONE_DIR = "/tmp/serviceCore/clone";
   private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote";
   private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local";
 
-  private static final String TEST_GROUP_NAME = "testGroup1";
-  private static final String TEST_FLOW_NAME = "testFlow1";
+  private static final String TEST_GROUP_NAME = "testGroup";
+  private static final String TEST_FLOW_NAME = "testFlow";
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
   private static final String TEST_DUMMY_GROUP_NAME = "dummyGroup";
@@ -100,7 +99,7 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".description",
         "StandaloneTestExecutor");
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".version",
-        "1");
+        FlowSpec.Builder.DEFAULT_VERSION);
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".uri",
         "gobblinExecutor");
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance",
@@ -168,8 +167,7 @@ public class GobblinServiceManagerTest {
     flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
 
     FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
-        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).
-            setRunImmediately(true))
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
         .setProperties(new StringMap(flowProperties));
 
     this.flowConfigClient.createFlowConfig(flowConfig);
@@ -191,11 +189,8 @@ public class GobblinServiceManagerTest {
     try {
       this.flowConfigClient.createFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409);
-      return;
+      Assert.fail("Create Again should pass without complaining that the spec already exists.");
     }
-
-    Assert.fail("Get should have gotten a 409 error");
   }
 
   @Test (dependsOnMethods = "testCreateAgain")
@@ -207,8 +202,8 @@ public class GobblinServiceManagerTest {
     Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
     Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
     Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
-    Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
-    // Add this asssert back when getFlowSpec() is changed to return the raw flow spec
+    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+    // Add this assert back when getFlowSpec() is changed to return the raw flow spec
     //Assert.assertEquals(flowConfig.getProperties().size(), 1);
     Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
   }
@@ -333,10 +328,8 @@ public class GobblinServiceManagerTest {
     try {
       this.flowConfigClient.updateFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
-      return;
+      Assert.fail("Bad update should pass without complaining that the spec does not exists.");
     }
-
-    Assert.fail("Get should have raised a 404 error");
+    cleanUpDir(FLOW_SPEC_STORE_DIR);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
index 864b238..2dbe790 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
@@ -65,7 +65,7 @@ public class IdentityFlowToJobSpecCompilerTest {
 
   private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
   private static final String SPEC_DESCRIPTION = "Test Orchestrator";
-  private static final String SPEC_VERSION = "1";
+  private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
   private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
   private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();
 
@@ -145,7 +145,7 @@ public class IdentityFlowToJobSpecCompilerTest {
           FLOW_SPEC_STORE_DIR))
           .withConfig(config)
           .withDescription("dummy description")
-          .withVersion("1")
+          .withVersion(SPEC_VERSION)
           .withTemplate(new URI(TEST_TEMPLATE_URI));
     } catch (URISyntaxException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
index cc722eb..8b08628 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
@@ -76,7 +76,7 @@ public class MultiHopsFlowToJobSpecCompilerTest {
 
   private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
   private static final String SPEC_DESCRIPTION = "Test Orchestrator";
-  private static final String SPEC_VERSION = "1";
+  private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
   private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
   private static final String TOPOLOGY_SPEC_STORE_DIR_SECOND = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis() + "_2";
   private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();
@@ -288,7 +288,7 @@ public class MultiHopsFlowToJobSpecCompilerTest {
           FLOW_SPEC_STORE_DIR))
           .withConfig(config)
           .withDescription("dummy description")
-          .withVersion("1")
+          .withVersion(SPEC_VERSION)
           .withTemplate(new URI(TEST_TEMPLATE_URI));
     } catch (URISyntaxException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 8896068..6d75f9e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -57,7 +57,7 @@ public class OrchestratorTest {
 
   private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
   private static final String SPEC_DESCRIPTION = "Test Orchestrator";
-  private static final String SPEC_VERSION = "1";
+  private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
   private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore";
   private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore";
   private static final String FLOW_SPEC_GROUP_DIR = "/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir";