You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/23 14:38:11 UTC
incubator-gobblin git commit: [GOBBLIN-670] Ensure
MultiHopFlowCompiler is initialized when job template catalog location is not
provided.[]
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 8f72514c6 -> 55a19bbfe
[GOBBLIN-670] Ensure MultiHopFlowCompiler is initialized when job template catalog location is not provided.[]
Closes #2540 from sv2000/startUp
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55a19bbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55a19bbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55a19bbf
Branch: refs/heads/master
Commit: 55a19bbfe693e68828ff2a85e6e0666b754b6a9b
Parents: 8f72514
Author: suvasude <su...@linkedin.biz>
Authored: Wed Jan 23 06:38:08 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jan 23 06:38:08 2019 -0800
----------------------------------------------------------------------
.../service/modules/core/GitFlowGraphMonitor.java | 17 +++++++++++------
.../modules/flow/MultiHopFlowCompiler.java | 18 +++++++++++-------
.../modules/core/GitFlowGraphMonitorTest.java | 5 +++--
3 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index a0b53fe..978875f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -32,6 +32,7 @@ import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.diff.DiffEntry;
import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.typesafe.config.Config;
@@ -87,13 +88,13 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
.put(SHOULD_CHECKPOINT_HASHES, false)
.build());
- private FSFlowCatalog flowCatalog;
+ private Optional<FSFlowCatalog> flowCatalog;
private FlowGraph flowGraph;
private final Map<URI, TopologySpec> topologySpecMap;
private final Config emptyConfig = ConfigFactory.empty();
private final CountDownLatch initComplete;
- public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
+ public GitFlowGraphMonitor(Config config, Optional<FSFlowCatalog> flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
this.flowCatalog = flowCatalog;
this.flowGraph = graph;
@@ -223,11 +224,15 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
- FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog, specExecutors);
- if (!this.flowGraph.addFlowEdge(edge)) {
- log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
+ if (flowCatalog.isPresent()) {
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog.get(), specExecutors);
+ if (!this.flowGraph.addFlowEdge(edge)) {
+ log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
+ } else {
+ log.info("Added edge {} to FlowGraph", edge.getId());
+ }
} else {
- log.info("Added edge {} to FlowGraph", edge.getId());
+ log.warn("Could not add edge defined in {} to FlowGraph as FlowCatalog is absent", change.getNewPath());
}
} catch (Exception e) {
log.warn("Could not add edge defined in {} due to exception {}", change.getNewPath(), e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 3f412f4..49915ef 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
@@ -86,13 +87,16 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
- Config templateCatalogCfg = config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog flowCatalog;
- try {
- flowCatalog = new FSFlowCatalog(templateCatalogCfg);
- } catch (IOException e) {
- throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+ Optional<FSFlowCatalog> flowCatalog;
+ if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
+ && StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
+ try {
+ flowCatalog = Optional.of(new FSFlowCatalog(config));
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+ }
+ } else {
+ flowCatalog = Optional.absent();
}
this.flowGraph = new BaseFlowGraph();
Config gitFlowGraphConfig = this.config;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index 13a452a..26f2180 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -46,6 +46,7 @@ import org.testng.annotations.Test;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -80,7 +81,7 @@ public class GitFlowGraphMonitorTest {
private final File edge1File = new File(edge1Dir, "edge1.properties");
private RefSpec masterRefSpec = new RefSpec("master");
- private FSFlowCatalog flowCatalog;
+ private Optional<FSFlowCatalog> flowCatalog;
private Config config;
private BaseFlowGraph flowGraph;
private GitFlowGraphMonitor gitFlowGraphMonitor;
@@ -118,7 +119,7 @@ public class GitFlowGraphMonitorTest {
Config templateCatalogCfg = config
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+ this.flowCatalog = Optional.of(new FSFlowCatalog(templateCatalogCfg));
//Create a FlowGraph instance with defaults
this.flowGraph = new BaseFlowGraph();