You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/23 14:38:11 UTC

incubator-gobblin git commit: [GOBBLIN-670] Ensure MultiHopFlowCompiler is initialized when job template catalog location is not provided.[]

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8f72514c6 -> 55a19bbfe


[GOBBLIN-670] Ensure MultiHopFlowCompiler is initialized when job template catalog location is not provided.[]

Closes #2540 from sv2000/startUp


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55a19bbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55a19bbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55a19bbf

Branch: refs/heads/master
Commit: 55a19bbfe693e68828ff2a85e6e0666b754b6a9b
Parents: 8f72514
Author: suvasude <su...@linkedin.biz>
Authored: Wed Jan 23 06:38:08 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jan 23 06:38:08 2019 -0800

----------------------------------------------------------------------
 .../service/modules/core/GitFlowGraphMonitor.java | 17 +++++++++++------
 .../modules/flow/MultiHopFlowCompiler.java        | 18 +++++++++++-------
 .../modules/core/GitFlowGraphMonitorTest.java     |  5 +++--
 3 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index a0b53fe..978875f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -32,6 +32,7 @@ import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.diff.DiffEntry;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
@@ -87,13 +88,13 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
           .put(SHOULD_CHECKPOINT_HASHES, false)
           .build());
 
-  private FSFlowCatalog flowCatalog;
+  private Optional<FSFlowCatalog> flowCatalog;
   private FlowGraph flowGraph;
   private final Map<URI, TopologySpec> topologySpecMap;
   private final Config emptyConfig = ConfigFactory.empty();
   private final CountDownLatch initComplete;
 
-  public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
+  public GitFlowGraphMonitor(Config config, Optional<FSFlowCatalog> flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
     super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
     this.flowCatalog = flowCatalog;
     this.flowGraph = graph;
@@ -223,11 +224,15 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
         Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
             FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
         FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
-        FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog, specExecutors);
-        if (!this.flowGraph.addFlowEdge(edge)) {
-          log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
+        if (flowCatalog.isPresent()) {
+          FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog.get(), specExecutors);
+          if (!this.flowGraph.addFlowEdge(edge)) {
+            log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
+          } else {
+            log.info("Added edge {} to FlowGraph", edge.getId());
+          }
         } else {
-          log.info("Added edge {} to FlowGraph", edge.getId());
+          log.warn("Could not add edge defined in {} to FlowGraph as FlowCatalog is absent", change.getNewPath());
         }
       } catch (Exception e) {
         log.warn("Could not add edge defined in {} due to exception {}", change.getNewPath(), e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 3f412f4..49915ef 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -86,13 +87,16 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
   public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
     super(config, log, instrumentationEnabled);
-    Config templateCatalogCfg = config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog flowCatalog;
-    try {
-      flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+    Optional<FSFlowCatalog> flowCatalog;
+    if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
+        && StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
+      try {
+        flowCatalog = Optional.of(new FSFlowCatalog(config));
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+      }
+    } else {
+      flowCatalog = Optional.absent();
     }
     this.flowGraph = new BaseFlowGraph();
     Config gitFlowGraphConfig = this.config;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index 13a452a..26f2180 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -46,6 +46,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -80,7 +81,7 @@ public class GitFlowGraphMonitorTest {
   private final File edge1File = new File(edge1Dir, "edge1.properties");
 
   private RefSpec masterRefSpec = new RefSpec("master");
-  private FSFlowCatalog flowCatalog;
+  private Optional<FSFlowCatalog> flowCatalog;
   private Config config;
   private BaseFlowGraph flowGraph;
   private GitFlowGraphMonitor gitFlowGraphMonitor;
@@ -118,7 +119,7 @@ public class GitFlowGraphMonitorTest {
     Config templateCatalogCfg = config
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+    this.flowCatalog = Optional.of(new FSFlowCatalog(templateCatalogCfg));
 
     //Create a FlowGraph instance with defaults
     this.flowGraph = new BaseFlowGraph();