You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/25 16:29:16 UTC

incubator-gobblin git commit: [GOBBLIN-291] Remove unnecessary spec list and read

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f3eadceed -> 2d05b03d5


[GOBBLIN-291] Remove unnecessary spec list and read

resolve review comments

Closes #2147 from arjun4084346/flowDelay


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2d05b03d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2d05b03d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2d05b03d

Branch: refs/heads/master
Commit: 2d05b03d5e76f9e4056b2adc321ff4b3ef778dc5
Parents: f3eadce
Author: Arjun <ab...@linkedin.com>
Authored: Wed Oct 25 09:29:11 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Oct 25 09:29:11 2017 -0700

----------------------------------------------------------------------
 .../gobblin/service/FlowConfigsResource.java    | 14 ++--
 .../gobblin/runtime/api/MutableSpecCatalog.java |  5 +-
 .../runtime/spec_catalog/FlowCatalog.java       | 12 ++-
 .../gobblin/runtime/spec_store/FSSpecStore.java | 85 +++++++++++++++-----
 .../gobblin/spec_catalog/FlowCatalogTest.java   | 10 ++-
 .../service/modules/core/GitConfigMonitor.java  | 17 ++--
 .../modules/orchestration/OrchestratorTest.java |  3 +-
 7 files changed, 102 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index 3159d49..a99087c 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -192,14 +192,12 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
       URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
           "/" + flowConfig.getId().getFlowGroup() + "/" + flowConfig.getId().getFlowName(), null, null);
 
-      if (getFlowCatalog().getSpec(flowUri) != null) {
+      if (getFlowCatalog().exists(flowUri)) {
         logAndThrowRestLiServiceException(HttpStatus.S_409_CONFLICT,
             "Flow with the same name already exists: " + flowUri, null);
       }
     } catch (URISyntaxException e) {
       logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getId().getFlowName(), e);
-    } catch (SpecNotFoundException e) {
-      // okay if flow does not exist
     }
 
     getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
@@ -231,7 +229,11 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
       URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
       flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
           "/" + flowGroup + "/" + flowName, null, null);
-      FlowSpec oldFlowSpec = (FlowSpec) getFlowCatalog().getSpec(flowUri);
+      if (!getFlowCatalog().exists(flowUri)) {
+        logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND,
+            "Flow does not exist: flowGroup " + flowGroup + " flowName " + flowName, null);
+      }
+
       FlowSpec newFlowSpec = createFlowSpecForConfig(flowConfig);
 
       getFlowCatalog().put(newFlowSpec);
@@ -239,9 +241,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
       return new UpdateResponse(HttpStatus.S_200_OK);
     } catch (URISyntaxException e) {
       logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e);
-    } catch (SpecNotFoundException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow does not exist: flowGroup " + flowGroup +
-          " flowName " + flowName, null);
     }
 
     return null;
@@ -265,7 +264,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
       URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
       flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
           "/" + flowGroup + "/" + flowName, null, null);
-      FlowSpec flowSpec = (FlowSpec) getFlowCatalog().getSpec(flowUri);
 
       getFlowCatalog().remove(flowUri);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 1751a56..f63600a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -34,7 +34,8 @@ public interface MutableSpecCatalog extends SpecCatalog {
   public void put(Spec spec);
 
   /**
-   * Removes an existing {@link Spec} with the given URI. A no-op if such {@link Spec} does not exist.
+   * Removes an existing {@link Spec} with the given URI.
+   * Throws SpecNotFoundException if such {@link Spec} does not exist.
    */
-  void remove(URI uri);
+  void remove(URI uri) throws SpecNotFoundException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 8ffa4d7..1cb09da 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -209,6 +209,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     }
   }
 
+  public boolean exists(URI uri) {
+    try {
+      return specStore.exists(uri);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
+    }
+  }
+
   @Override
   public Spec getSpec(URI uri) throws SpecNotFoundException {
     try {
@@ -240,7 +248,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   }
 
   @Override
-  public void remove(URI uri) {
+  public void remove(URI uri) throws SpecNotFoundException {
     try {
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
       Preconditions.checkNotNull(uri);
@@ -250,7 +258,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
       this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion());
       specStore.deleteSpec(uri);
 
-    } catch (IOException | SpecNotFoundException e) {
+    } catch (IOException e) {
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 608d390..b283c87 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.runtime.spec_store;
 
 import com.google.common.io.ByteStreams;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
@@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -98,20 +101,77 @@ public class FSSpecStore implements SpecStore {
     }
   }
 
+  /**
+   * @param specUri path of the spec
+   * @return empty string for topology spec, as topolgies do not have a group,
+   *         group name for flow spec
+   */
+  public static String getSpecGroup(Path specUri) {
+    return specUri.getParent().getName();
+  }
+
+  public static String getSpecName(Path specUri) {
+    return Files.getNameWithoutExtension(specUri.getName());
+  }
+
+  private Collection<Spec> getAllVersionsOfSpec(String specGroup, String specName) throws IOException {
+    Collection<Spec> specs = Lists.newArrayList();
+    FileStatus[] fileStatuses;
+    try {
+      fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
+    } catch (FileNotFoundException e) {
+      return specs;
+    }
+
+    for (FileStatus fileStatus : fileStatuses) {
+      if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
+        specs.add(readSpecFromFile(fileStatus.getPath()));
+      }
+    }
+    return specs;
+  }
+
+  @Override
+  public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException {
+    Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
+    Path specPath = new Path(specUri.getPath());
+    return getAllVersionsOfSpec(getSpecGroup(specPath), getSpecName(specPath));
+  }
+
   @Override
   public boolean exists(URI specUri) throws IOException {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
+    Path flowPath = new Path(specUri.getPath());
+    String specGroup = getSpecGroup(flowPath);
+    String specName = getSpecName(flowPath);
+    FileStatus[] fileStatuses;
+    try {
+      fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
+    } catch (FileNotFoundException e) {
+      return false;
+    }
 
-    FileStatus[] fileStatuses = fs.listStatus(this.fsSpecStoreDirPath);
+    // TODO Fix ETL-6496
+    // We need to revisit having a version delimiter.
+    // Currently without a delimiter the prefix check may match other specs that should not be matched.
     for (FileStatus fileStatus : fileStatuses) {
-      if (StringUtils.startsWith(fileStatus.getPath().getName(), specUri.toString())) {
+      if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
         return true;
       }
     }
-
     return false;
   }
 
+  private FileStatus[] listSpecs(Path fsSpecStoreDirPath, String specGroup) throws FileNotFoundException, IOException {
+    FileStatus[] fileStatuses;
+    if (StringUtils.isEmpty(specGroup)) {
+      fileStatuses = fs.listStatus(fsSpecStoreDirPath);
+    } else {
+      fileStatuses = fs.listStatus(new Path(fsSpecStoreDirPath, specGroup));
+    }
+    return fileStatuses;
+  }
+
   @Override
   public void addSpec(Spec spec) throws IOException {
     Preconditions.checkArgument(null != spec, "Spec should not be null");
@@ -207,25 +267,6 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
-  public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException, SpecNotFoundException {
-    Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
-
-    Collection<Spec> specs = getSpecs();
-    Collection<Spec> filteredSpecs = Lists.newArrayList();
-    for (Spec spec : specs) {
-      if (spec.getUri().equals(specUri)) {
-        filteredSpecs.add(spec);
-      }
-    }
-
-    if (filteredSpecs.size() == 0) {
-      throw new SpecNotFoundException(specUri);
-    }
-
-    return filteredSpecs;
-  }
-
-  @Override
   public Collection<Spec> getSpecs() throws IOException {
     Collection<Spec> specs = Lists.newArrayList();
     try {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
index 73c1f46..ae2e087 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public class FlowCatalogTest {
 
   private static final String SPEC_STORE_PARENT_DIR = "/tmp";
   private static final String SPEC_STORE_DIR = "/tmp/flowTestSpecStore";
+  private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir";
   private static final String SPEC_DESCRIPTION = "Test Flow Spec";
   private static final String SPEC_VERSION = "1";
 
@@ -142,7 +144,7 @@ public class FlowCatalogTest {
   }
 
   @Test (dependsOnMethods = "createFlowSpec")
-  public void deleteFlowSpec() {
+  public void deleteFlowSpec() throws SpecNotFoundException {
     // List Current Specs
     Collection<Spec> specs = flowCatalog.getSpecs();
     logger.info("[Before Delete] Number of specs: " + specs.size());
@@ -157,18 +159,18 @@ public class FlowCatalogTest {
 
     // List Specs after adding
     specs = flowCatalog.getSpecs();
-    logger.info("[After Create] Number of specs: " + specs.size());
+    logger.info("[After Delete] Number of specs: " + specs.size());
     i = 0;
     for (Spec spec : specs) {
       flowSpec = (FlowSpec) spec;
-      logger.info("[After Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+      logger.info("[After Delete] Spec " + i++ + ": " + gson.toJson(flowSpec));
     }
     Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion");
   }
 
   public URI computeFlowSpecURI() {
     // Make sure this is relative
-    URI uri = PathUtils.relativizePath(new Path(SPEC_STORE_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
+    URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
     return uri;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 82f3e0d..00f8fc2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -234,8 +236,8 @@ public class GitConfigMonitor extends AbstractIdleService {
   private void removeSpec(DiffEntry change) {
     if (checkConfigFilePath(change.getOldPath())) {
       Path configFilePath = new Path(this.repositoryDir, change.getOldPath());
-      String flowName = Files.getNameWithoutExtension(configFilePath.getName());
-      String flowGroup = configFilePath.getParent().getName();
+      String flowName = FSSpecStore.getSpecName(configFilePath);
+      String flowGroup = FSSpecStore.getSpecGroup(configFilePath);
 
       // build a dummy config to get the proper URI for delete
       Config dummyConfig = ConfigBuilder.create()
@@ -249,7 +251,12 @@ public class GitConfigMonitor extends AbstractIdleService {
           .withDescription(SPEC_DESCRIPTION)
           .build();
 
-      this.flowCatalog.remove(spec.getUri());
+      try {
+        this.flowCatalog.remove(spec.getUri());
+      } catch (SpecNotFoundException e) {
+        // okay if flow does not exist
+        log.warn("Flow {} does not exist.", spec.getUri());
+      }
     }
   }
 
@@ -285,8 +292,8 @@ public class GitConfigMonitor extends AbstractIdleService {
    */
   private Config loadConfigFileWithFlowNameOverrides(Path configFilePath) throws IOException {
     Config flowConfig = this.pullFileLoader.loadPullFile(configFilePath, emptyConfig, false);
-    String flowName = Files.getNameWithoutExtension(configFilePath.getName());
-    String flowGroup = configFilePath.getParent().getName();
+    String flowName = FSSpecStore.getSpecName(configFilePath);
+    String flowGroup = FSSpecStore.getSpecGroup(configFilePath);
 
     return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))
         .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index a933e85..8896068 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -60,6 +60,7 @@ public class OrchestratorTest {
   private static final String SPEC_VERSION = "1";
   private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore";
   private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore";
+  private static final String FLOW_SPEC_GROUP_DIR = "/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir";
 
   private ServiceBasedAppLauncher serviceLauncher;
   private TopologyCatalog topologyCatalog;
@@ -141,7 +142,7 @@ public class OrchestratorTest {
     FlowSpec.Builder flowSpecBuilder = null;
     try {
       flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
-          FLOW_SPEC_STORE_DIR))
+          FLOW_SPEC_GROUP_DIR))
           .withConfig(config)
           .withDescription(SPEC_DESCRIPTION)
           .withVersion(SPEC_VERSION)