You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/24 21:50:05 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-748]
Craftsmanship code cleaning in Gobblin Service Code
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9661916 [GOBBLIN-748] Craftsmanship code cleaning in Gobblin Service Code
9661916 is described below
commit 9661916684d466edf6ae499800a252c108bb9aa2
Author: autumnust <le...@linkedin.com>
AuthorDate: Wed Apr 24 14:49:54 2019 -0700
[GOBBLIN-748] Craftsmanship code cleaning in Gobblin Service Code
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
Essentially a bunch of renaming, moving
configurations key around.
Things that we should agree on:
- Configuration keys should always placed in the
class where they are relevant, instead of
`ConfigurationKeys.java` or similar things. We
should definitely stop doing that.
- Naming of `FSFlowCatalog` is misleading. In fact
it should be `FsTemplateFlowCatalog`. Also, it
should not extend `FsJobCatalog`. The missing
piece is a `FsCatalogBase` that implement all
FileSystem related basic functionality, and have
`Template` and `Job` FsCatalog extending that base
implementation with it own interface.
### JIRA
- [x ] My PR addresses the following [Gobblin
JIRA]
-
https://issues.apache.org/jira/browse/GOBBLIN-748
### Description
- [ x] Here are some details about my PR,
including screenshots (if applicable):
### Tests
- [ ] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
### Commits
- [ ] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Some naive code changes made while reading through
GaaS code: There are some work needs to be done
for understand master/slave interaction
Adding some documentation and renaming stuff
Address comments
Resolving conflicts
Closes #2613 from
autumnust/craftsmanshipCleaningGaaS
---
.../gobblin/configuration/ConfigurationKeys.java | 11 -----
.../org/apache/gobblin/service/FlowConfigTest.java | 3 +-
.../apache/gobblin/service/FlowConfigV2Test.java | 3 +-
.../apache/gobblin/runtime/api/JobTemplate.java | 5 +++
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 46 +++++++++++++--------
.../runtime/spec_catalog/TopologyCatalog.java | 2 +-
.../gobblin/runtime/spec_store/FSSpecStore.java | 10 +++--
.../runtime/spec_store/FSSpecStoreTest.java | 7 ++--
.../service/modules/core/GitConfigMonitor.java | 2 +-
.../service/modules/core/GitFlowGraphMonitor.java | 6 +--
.../service/modules/flow/MultiHopFlowCompiler.java | 6 +--
.../service/modules/flowgraph/BaseFlowEdge.java | 20 +++------
.../gobblin/service/modules/flowgraph/Dag.java | 3 ++
.../service/modules/flowgraph/FlowEdgeFactory.java | 7 ++--
.../service/modules/orchestration/DagManager.java | 16 +++++---
.../modules/orchestration/DagStateStore.java | 12 +++---
.../modules/orchestration/FSDagStateStore.java | 2 +-
.../service/modules/spec/JobExecutionPlan.java | 4 +-
...FlowCatalog.java => FSFlowTemplateCatalog.java} | 47 +++++++++++++++-------
.../service/modules/core/GitConfigMonitorTest.java | 2 +-
.../modules/core/GitFlowGraphMonitorTest.java | 8 ++--
.../service/modules/core/GobblinServiceHATest.java | 5 ++-
.../modules/core/GobblinServiceManagerTest.java | 3 +-
.../modules/flow/MultiHopFlowCompilerTest.java | 6 +--
.../modules/flowgraph/BaseFlowEdgeFactoryTest.java | 4 +-
.../scheduler/GobblinServiceJobSchedulerTest.java | 4 +-
.../spec/JobExecutionPlanDagFactoryTest.java | 6 +--
...logTest.java => FSFlowTemplateCatalogTest.java} | 6 +--
28 files changed, 146 insertions(+), 110 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8cdddb8..a5b56bb 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -883,23 +883,12 @@ public class ConfigurationKeys {
public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
/***
- * Configuration properties related to Spec Store
- */
- public static final String SPECSTORE_FS_DIR_KEY = "specStore.fs.dir";
-
- /***
* Configuration properties related to TopologySpec Store
*/
public static final String TOPOLOGYSPEC_STORE_CLASS_KEY = "topologySpec.store.class";
public static final String TOPOLOGYSPEC_STORE_DIR_KEY = "topologySpec.store.dir";
/***
- * Configuration properties related to FlowSpec Store
- */
- public static final String FLOWSPEC_STORE_CLASS_KEY = "flowSpec.store.class";
- public static final String FLOWSPEC_STORE_DIR_KEY = "flowSpec.store.dir";
-
- /***
* Configuration properties related to Spec Executor Instance
*/
public static final String SPECEXECUTOR_INSTANCE_URI_KEY = "specExecInstance.uri";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 1e549e8..ecd884e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -23,6 +23,7 @@ import java.net.ServerSocket;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -70,7 +71,7 @@ public class FlowConfigTest {
configBuilder
.addPrimitive(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, _testDirectory.getAbsolutePath())
- .addPrimitive(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
+ .addPrimitive(FSSpecStore.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
cleanUpDir(TEST_SPEC_STORE_DIR);
Config config = configBuilder.build();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 9b43875..33e8e58 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -65,7 +66,7 @@ public class FlowConfigV2Test {
configBuilder
.addPrimitive(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, _testDirectory.getAbsolutePath())
- .addPrimitive(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
+ .addPrimitive(FSSpecStore.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
cleanUpDir(TEST_SPEC_STORE_DIR);
Config config = configBuilder.build();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
index f8f3071..1be22fc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
@@ -58,6 +58,11 @@ public interface JobTemplate extends Spec {
*/
Config getResolvedConfig(Config userProps) throws SpecNotFoundException, TemplateException;
+ /**
+ * The Exception thrown while occurring error when loading/resolving a template.
+ * Note that the template here is not necessary to be a JobTemplate, it could be a FlowTemplate in the
+ * context of Gobblin Service.
+ */
class TemplateException extends Exception {
public TemplateException(String message, Throwable cause) {
super(message, cause);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 71e0f9f..9b4c00c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,6 +17,10 @@
package org.apache.gobblin.runtime.spec_catalog;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -27,22 +31,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-
+import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
-
-import javax.annotation.Nonnull;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -61,8 +56,17 @@ import org.apache.gobblin.util.callbacks.CallbackResult;
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
-@Alpha
+/**
+ * A service that interact with FlowSpec storage.
+ * The FlowSpec storage, a.k.a. {@link SpecStore} should be plugable with different implementation.
+ */
public class FlowCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog, SpecSerDe {
+
+ /***
+ * Configuration properties related to FlowSpec Store
+ */
+ public static final String FLOWSPEC_STORE_CLASS_KEY = "flowSpec.store.class";
+ public static final String FLOWSPEC_STORE_DIR_KEY = "flowSpec.store.dir";
public static final String DEFAULT_FLOWSPEC_STORE_CLASS = FSSpecStore.class.getCanonicalName();
protected final SpecCatalogListenersList listeners;
@@ -104,15 +108,15 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
try {
Config newConfig = config;
- if (config.hasPath(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY)) {
- newConfig = config.withValue(ConfigurationKeys.SPECSTORE_FS_DIR_KEY,
- config.getValue(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY));
+ if (config.hasPath(FLOWSPEC_STORE_DIR_KEY)) {
+ newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
+ config.getValue(FLOWSPEC_STORE_DIR_KEY));
}
String specStoreClassName = DEFAULT_FLOWSPEC_STORE_CLASS;
- if (config.hasPath(ConfigurationKeys.FLOWSPEC_STORE_CLASS_KEY)) {
- specStoreClassName = config.getString(ConfigurationKeys.FLOWSPEC_STORE_CLASS_KEY);
+ if (config.hasPath(FLOWSPEC_STORE_CLASS_KEY)) {
+ specStoreClassName = config.getString(FLOWSPEC_STORE_CLASS_KEY);
}
- this.log.info("Using audit sink class name/alias " + specStoreClassName);
+ this.log.info(String.format("Using class name/alias [%s] for specstore", specStoreClassName));
this.specStore = (SpecStore) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
specStoreClassName)), newConfig, this);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
@@ -252,6 +256,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
}
+ /**
+ * Persist {@link Spec} into {@link SpecStore} and notify {@link SpecCatalogListener} if triggerListener
+ * is set to true.
+ *
+ * @param spec The Spec to be added
+ * @param triggerListener True if listeners should be notified.
+ * @return
+ */
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
try {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index c44e111..043dd04 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -111,7 +111,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
try {
Config newConfig = config;
if (config.hasPath(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY)) {
- newConfig = config.withValue(ConfigurationKeys.SPECSTORE_FS_DIR_KEY,
+ newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
config.getValue(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY));
}
String specStoreClassName = DEFAULT_TOPOLOGYSPEC_STORE_CLASS;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index e93464d..344901b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.Spec;
@@ -56,6 +55,11 @@ import org.slf4j.LoggerFactory;
*/
public class FSSpecStore implements SpecStore {
+ /***
+ * Configuration properties related to Spec Store
+ */
+ public static final String SPECSTORE_FS_DIR_KEY = "specStore.fs.dir";
+
protected final Logger log;
protected final Config sysConfig;
protected final FileSystem fs;
@@ -79,13 +83,13 @@ public class FSSpecStore implements SpecStore {
public FSSpecStore(Config sysConfig, SpecSerDe specSerDe, Optional<Logger> log)
throws IOException {
- Preconditions.checkArgument(sysConfig.hasPath(ConfigurationKeys.SPECSTORE_FS_DIR_KEY),
+ Preconditions.checkArgument(sysConfig.hasPath(SPECSTORE_FS_DIR_KEY),
"FS SpecStore path must be specified.");
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.sysConfig = sysConfig;
this.specSerDe = specSerDe;
- this.fsSpecStoreDir = this.sysConfig.getString(ConfigurationKeys.SPECSTORE_FS_DIR_KEY);
+ this.fsSpecStoreDir = this.sysConfig.getString(SPECSTORE_FS_DIR_KEY);
this.fsSpecStoreDirPath = new Path(this.fsSpecStoreDir);
this.log.info("FSSpecStore directory is: " + this.fsSpecStoreDir);
try {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
index e0e6ebb..a952f5d 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
@@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.SerializationUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.util.ConfigUtils;
@@ -50,7 +49,7 @@ public class FSSpecStoreTest {
public void testPathConversion() throws Exception {
Properties properties = new Properties();
File tmpDir = Files.createTempDir();
- properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, tmpDir.getAbsolutePath());
+ properties.setProperty(FSSpecStore.SPECSTORE_FS_DIR_KEY, tmpDir.getAbsolutePath());
SpecSerDe specSerDe = Mockito.mock(SpecSerDe.class);
FSSpecStore fsSpecStore = new FSSpecStore(ConfigUtils.propertiesToConfig(properties), specSerDe);
@@ -68,7 +67,7 @@ public class FSSpecStoreTest {
File specDir = Files.createTempDir();
Properties properties = new Properties();
- properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
+ properties.setProperty(FSSpecStore.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
SpecSerDe serde = Mockito.mock(SpecSerDe.class);
TestFsSpecStore fsSpecStore = new TestFsSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
@@ -122,7 +121,7 @@ public class FSSpecStoreTest {
public void testGetSpecURI() throws Exception {
File specDir = Files.createTempDir();
Properties properties = new Properties();
- properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
+ properties.setProperty(FSSpecStore.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
SpecSerDe serde = Mockito.mock(SpecSerDe.class);
FSSpecStore fsSpecStore = new FSSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 4fe03da..7bbd4f6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.util.PullFileLoader;
/**
* Service that monitors for jobs from a git repository.
- * The git repository must have an inital commit that has no config files since that is used as a base for getting
+ * The git repository must have an initial commit that has no config files since that is used as a base for getting
* the change list.
* The config needs to be organized with the following structure:
* <root_config_dir>/<flowGroup>/<flowName>.(pull|job|json|conf)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 978875f..024c9a1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -49,7 +49,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -88,13 +88,13 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
.put(SHOULD_CHECKPOINT_HASHES, false)
.build());
- private Optional<FSFlowCatalog> flowCatalog;
+ private Optional<FSFlowTemplateCatalog> flowCatalog;
private FlowGraph flowGraph;
private final Map<URI, TopologySpec> topologySpecMap;
private final Config emptyConfig = ConfigFactory.empty();
private final CountDownLatch initComplete;
- public GitFlowGraphMonitor(Config config, Optional<FSFlowCatalog> flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
+ public GitFlowGraphMonitor(Config config, Optional<FSFlowTemplateCatalog> flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
this.flowCatalog = flowCatalog;
this.flowGraph = graph;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 756dfaa..347235a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
@@ -53,7 +54,6 @@ import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -88,11 +88,11 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
this.flowGraph = new BaseFlowGraph();
- Optional<FSFlowCatalog> flowCatalog = Optional.absent();
+ Optional<FSFlowTemplateCatalog> flowCatalog = Optional.absent();
if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
&& StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
try {
- flowCatalog = Optional.of(new FSFlowCatalog(config));
+ flowCatalog = Optional.of(new FSFlowTemplateCatalog(config));
} catch (IOException e) {
throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 250fd57..0c1023e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,33 +17,22 @@
package org.apache.gobblin.service.modules.flowgraph;
-import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import joptsimple.internal.Strings;
import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
import org.apache.gobblin.service.modules.template.FlowTemplate;
-import org.apache.gobblin.util.PullFileLoader;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -133,11 +122,12 @@ public class BaseFlowEdge implements FlowEdge {
* A method to return an instance of {@link BaseFlowEdge}. The method performs all the validation checks
* and returns
* @param edgeProps Properties of edge
- * @param flowCatalog Flow Catalog used to retrieve {@link FlowTemplate}s.
+ * @param flowTemplateCatalog Flow Catalog used to retrieve {@link FlowTemplate}s.
* @return a {@link BaseFlowEdge}
*/
@Override
- public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException {
+ public FlowEdge createFlowEdge(Config edgeProps, FSFlowTemplateCatalog flowTemplateCatalog, List<SpecExecutor> specExecutors)
+ throws FlowEdgeCreationException {
try {
String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
@@ -157,7 +147,7 @@ public class BaseFlowEdge implements FlowEdge {
.checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
- FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
+ FlowTemplate flowTemplate = flowTemplateCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
} catch (RuntimeException e) {
throw e;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index c38460e..24fbb9a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -238,6 +238,9 @@ public class Dag<T> {
return this;
}
+ /**
+ * DagNode is essentially a job within a Dag, usually they are used interchangeably.
+ */
@Getter
public static class DagNode<T> {
private T value;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
index 3744bf0..eb1d026 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -22,18 +22,19 @@ import java.util.List;
import com.typesafe.config.Config;
import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+
public interface FlowEdgeFactory {
/**
* Construct a {@link FlowEdge} from the edge properties
* @param edgeProps properties of the {@link FlowEdge}
- * @param flowCatalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
+ * @param flowCatalog an instance of {@link FSFlowTemplateCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
* useful for creating a {@link FlowEdge}.
* @return an instance of {@link FlowEdge}
* @throws FlowEdgeCreationException
*/
- public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException;
+ public FlowEdge createFlowEdge(Config edgeProps, FSFlowTemplateCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException;
public class FlowEdgeCreationException extends Exception {
private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a9c1bbf..2e2360b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -313,7 +313,7 @@ public class DagManager extends AbstractIdleService {
}
log.debug("Polling job statuses..");
//Poll and update the job statuses of running jobs.
- pollJobStatuses();
+ pollAndAdvanceDag();
log.debug("Poll done.");
//Clean up any finished dags
log.debug("Cleaning up finished dags..");
@@ -357,10 +357,9 @@ public class DagManager extends AbstractIdleService {
}
/**
- * Poll the statuses of running jobs.
- * @return List of {@link JobStatus}es.
+ * Proceed the execution of each dag node based on job status.
*/
- private void pollJobStatuses()
+ private void pollAndAdvanceDag()
throws IOException {
this.failedDagIdsFinishRunning.clear();
@@ -428,7 +427,13 @@ public class DagManager extends AbstractIdleService {
}
}
- Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
+ /**
+ * Obtain next dag
+ * @param dagId The dagId that has been processed.
+ * @return
+ * @throws IOException
+ */
+ synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
//Submit jobs from the dag ready for execution.
@@ -437,6 +442,7 @@ public class DagManager extends AbstractIdleService {
}
//Checkpoint the dag state
this.dagStateStore.writeCheckpoint(dag);
+
Map<String, Set<DagNode<JobExecutionPlan>>> dagIdToNextJobs = Maps.newHashMap();
dagIdToNextJobs.put(dagId, nextNodes);
return dagIdToNextJobs;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 0984a91..55ce6f0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -34,20 +34,22 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
public interface DagStateStore {
/**
* Persist the {@link Dag} to the backing store.
- * @param dag
+ * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted
+ * and be picked up again when leader transition happens.
+ * @param dag The dag submitted to {@link DagManager}
*/
- public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+ void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
/**
* Delete the {@link Dag} from the backing store, typically upon completion of execution.
- * @param dag
+ * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
*/
- public void cleanUp(Dag<JobExecutionPlan> dag);
+ void cleanUp(Dag<JobExecutionPlan> dag);
/**
* Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager}
* takes over or on restart of service.
* @return a {@link List} of currently running {@link Dag}s.
*/
- public List<Dag<JobExecutionPlan>> getDags() throws IOException;
+ List<Dag<JobExecutionPlan>> getDags() throws IOException;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 5473c8e..338cea7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -121,7 +121,7 @@ public class FSDagStateStore implements DagStateStore {
/**
* Return a {@link Dag} given a file name.
* @param dagFile
- * @return the {@link Dag} associated with the dagFiel.
+ * @return the {@link Dag} associated with the dagFile.
*/
@VisibleForTesting
public Dag<JobExecutionPlan> getDag(File dagFile) throws IOException {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index d4ac3ac..45e65bc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -108,7 +108,7 @@ public class JobExecutionPlan {
jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
//Remove template uri
- jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowCatalog.JOB_TEMPLATE_KEY));
+ jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowTemplateCatalog.JOB_TEMPLATE_KEY));
// Add job.name and job.group
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName)));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
similarity index 84%
rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
rename to gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index e1f2b6a..0dafc2d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -56,9 +56,15 @@ import org.apache.gobblin.util.PathUtils;
* <p> /path/to/template/catalog/flowName/flow.conf </p>
* <p> /path/to/template/catalog/flowName/jobs/job1.(job|template) </p>
* <p> /path/to/template/catalog/flowName/jobs/job2.(job|template) </p>
+ *
+ * Avoid confusing with {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} which is a catalog for
+ * {@link org.apache.gobblin.runtime.api.FlowSpec}.
+ *
+ * Note that any exceptions thrown here should be propagated into called level for handling, since the handling
+ * of exceptions while loading/resolving template is subject to caller logic.
*/
@Alpha
-public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTemplates {
+public class FSFlowTemplateCatalog extends FSJobCatalog implements FlowCatalogWithTemplates {
public static final String JOBS_DIR_NAME = "jobs";
public static final String FLOW_CONF_FILE_NAME = "flow.conf";
public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job", ".template");
@@ -71,7 +77,7 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
* @param sysConfig that must contain the fully qualified path of the flow template catalog
* @throws IOException
*/
- public FSFlowCatalog(Config sysConfig)
+ public FSFlowTemplateCatalog(Config sysConfig)
throws IOException {
super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
@@ -87,13 +93,10 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
*/
public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
throws SpecNotFoundException, JobTemplate.TemplateException, IOException, URISyntaxException {
- if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
- throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
- }
- if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
- throw new RuntimeException(
- "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
+ if (!validateTemplateURI(flowTemplateDirURI)) {
+ throw new JobTemplate.TemplateException(String.format("The FlowTemplate %s is not valid", flowTemplateDirURI));
}
+
String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
// path of uri is location of template file relative to the job configuration root directory
Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI.getPath()));
@@ -125,13 +128,10 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
return false;
};
- if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
- throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
- }
- if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
- throw new RuntimeException(
- "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
+ if (!validateTemplateURI(flowTemplateDirURI)) {
+ throw new JobTemplate.TemplateException(String.format("The FlowTemplate %s is not valid", flowTemplateDirURI));
}
+
List<JobTemplate> jobTemplates = new ArrayList<>();
String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
@@ -167,4 +167,23 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
return ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8)).resolve(options);
}
}
+
+ /**
+ * Determine if an URI of a jobTemplate or a FlowTemplate is valid.
+ * @param flowURI The given job/flow template
+ * @return true if the URI is valid.
+ */
+ private boolean validateTemplateURI(URI flowURI) {
+ if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
+ log.error("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
+ return false;
+ }
+ if (!flowURI.getScheme().equals(FS_SCHEME)) {
+ log.error(
+ "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowURI.getScheme());
+ return false;
+ }
+
+ return true;
+ }
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
index 71ad56d..1912d15 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -96,7 +96,7 @@ public class GitConfigMonitorTest {
.addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI,
this.remoteRepo.getDirectory().getAbsolutePath())
.addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
- .addPrimitive(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog")
+ .addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog")
.addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
.build();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index 26f2180..6a02652 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.dircache.DirCache;
@@ -60,7 +61,6 @@ import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
public class GitFlowGraphMonitorTest {
@@ -81,7 +81,7 @@ public class GitFlowGraphMonitorTest {
private final File edge1File = new File(edge1Dir, "edge1.properties");
private RefSpec masterRefSpec = new RefSpec("master");
- private Optional<FSFlowCatalog> flowCatalog;
+ private Optional<FSFlowTemplateCatalog> flowCatalog;
private Config config;
private BaseFlowGraph flowGraph;
private GitFlowGraphMonitor gitFlowGraphMonitor;
@@ -111,7 +111,7 @@ public class GitFlowGraphMonitorTest {
.addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
.build();
- // Create a FSFlowCatalog instance
+ // Create a FSFlowTemplateCatalog instance
URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
Properties properties = new Properties();
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
@@ -119,7 +119,7 @@ public class GitFlowGraphMonitorTest {
Config templateCatalogCfg = config
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- this.flowCatalog = Optional.of(new FSFlowCatalog(templateCatalogCfg));
+ this.flowCatalog = Optional.of(new FSFlowTemplateCatalog(templateCatalogCfg));
//Create a FlowGraph instance with defaults
this.flowGraph = new BaseFlowGraph();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index efb1a6d..a4a4497 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
@@ -142,7 +143,7 @@ public class GobblinServiceHATest {
Properties node1ServiceCoreProperties = new Properties();
node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
node1ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_1_TOPOLOGY_SPEC_STORE_DIR);
- node1ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, NODE_1_FLOW_SPEC_STORE_DIR);
+ node1ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_1_FLOW_SPEC_STORE_DIR);
node1ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_1_JOB_STATUS_STATE_STORE_DIR);
node1ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler1");
node1ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
@@ -150,7 +151,7 @@ public class GobblinServiceHATest {
Properties node2ServiceCoreProperties = new Properties();
node2ServiceCoreProperties.putAll(commonServiceCoreProperties);
node2ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_2_TOPOLOGY_SPEC_STORE_DIR);
- node2ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, NODE_2_FLOW_SPEC_STORE_DIR);
+ node2ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_2_FLOW_SPEC_STORE_DIR);
node2ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_2_JOB_STATUS_STATE_STORE_DIR);
node2ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler2");
node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index efe1b95..d5105be 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jgit.api.Git;
@@ -110,7 +111,7 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
- serviceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
+ serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".description",
"StandaloneTestExecutor");
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 5877e66..4bffdb1 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -84,7 +85,6 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
@@ -124,7 +124,7 @@ public class MultiHopFlowCompilerTest {
URI specExecutorCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
Map<URI, TopologySpec> topologySpecMap = buildTopologySpecMap(specExecutorCatalogUri);
- //Create a FSFlowCatalog instance
+ //Create a FSFlowTemplateCatalog instance
URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
Properties properties = new Properties();
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
@@ -132,7 +132,7 @@ public class MultiHopFlowCompilerTest {
Config templateCatalogCfg = config
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+ FSFlowTemplateCatalog flowCatalog = new FSFlowTemplateCatalog(templateCatalogCfg);
//Add FlowEdges from the edge properties files
URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
index 085d0a7..79067f8 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -34,7 +34,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import lombok.extern.slf4j.Slf4j;
@@ -66,7 +66,7 @@ public class BaseFlowEdgeFactoryTest {
Config templateCatalogCfg = config
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+ FSFlowTemplateCatalog catalog = new FSFlowTemplateCatalog(templateCatalogCfg);
Config edgeProps = ConfigUtils.propertiesToConfig(properties);
FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog, specExecutorList);
Assert.assertEquals(flowEdge.getSrc(), "node1");
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index e289cbd..d30de37 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -40,6 +40,8 @@ import org.apache.gobblin.util.ConfigUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*;
+
public class GobblinServiceJobSchedulerTest {
private static final String TEST_GROUP_NAME = "testGroup";
@@ -56,7 +58,7 @@ public class GobblinServiceJobSchedulerTest {
File specDir = Files.createTempDir();
Properties properties = new Properties();
- properties.setProperty(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+ properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 2542f5e..4ffdb67 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -44,7 +44,7 @@ import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.template.FlowTemplate;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -56,7 +56,7 @@ public class JobExecutionPlanDagFactoryTest {
@BeforeClass
public void setUp() throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
- // Create a FSFlowCatalog instance
+ // Create a FSFlowTemplateCatalog instance
URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
Properties properties = new Properties();
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
@@ -64,7 +64,7 @@ public class JobExecutionPlanDagFactoryTest {
Config templateCatalogCfg = config
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+ FSFlowTemplateCatalog catalog = new FSFlowTemplateCatalog(templateCatalogCfg);
FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
this.jobTemplates = flowTemplate.getJobTemplates();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
similarity index 96%
rename from gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
rename to gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
index 3c8ebd3..550c97d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
@@ -43,21 +43,21 @@ import org.testng.collections.Lists;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class FSFlowCatalogTest {
+public class FSFlowTemplateCatalogTest {
private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + TEST_TEMPLATE_NAME;
@Test
public void testGetFlowTemplate() throws Exception {
URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
- // Create a FSFlowCatalog instance
+ // Create a FSFlowTemplateCatalog instance
Properties properties = new Properties();
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
Config config = ConfigFactory.parseProperties(properties);
Config templateCatalogCfg = config
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+ FSFlowTemplateCatalog catalog = new FSFlowTemplateCatalog(templateCatalogCfg);
FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_DIR_URI));
//Basic sanity check for the FlowTemplate