You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/06/05 04:05:55 UTC
[2/2] incubator-gobblin git commit: [GOBBLIN-491] Create a FlowGraph
representation for multi-hop support in Gobblin-as-a-Service.[]
[GOBBLIN-491] Create a FlowGraph representation for multi-hop support in Gobblin-as-a-Service.[]
Closes #2361 from sv2000/flowGraph
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9b91fa1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9b91fa1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9b91fa1b
Branch: refs/heads/master
Commit: 9b91fa1b3bc7ec62aa3a4192dffd3fef63dfea94
Parents: 8949aa3
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jun 4 21:05:50 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Jun 4 21:05:50 2018 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 1 +
.../apache/gobblin/runtime/api/JobTemplate.java | 6 +
.../runtime/template/StaticJobTemplate.java | 19 +-
...ackagedTemplatesJobCatalogDecoratorTest.java | 3 +
.../template/InheritingJobTemplateTest.java | 6 +
.../dataset/BaseHdfsDatasetDescriptor.java | 99 ++++++++
.../modules/dataset/DatasetDescriptor.java | 44 ++++
.../modules/dataset/HdfsDatasetDescriptor.java | 40 ++++
.../service/modules/flowgraph/BaseDataNode.java | 81 +++++++
.../service/modules/flowgraph/BaseFlowEdge.java | 191 +++++++++++++++
.../modules/flowgraph/BaseFlowGraph.java | 189 +++++++++++++++
.../gobblin/service/modules/flowgraph/Dag.java | 153 ++++++++++++
.../service/modules/flowgraph/DataNode.java | 54 +++++
.../flowgraph/DatasetDescriptorConfigKeys.java | 28 +++
.../modules/flowgraph/FileSystemDataNode.java | 83 +++++++
.../service/modules/flowgraph/FlowEdge.java | 85 +++++++
.../modules/flowgraph/FlowEdgeFactory.java | 53 +++++
.../service/modules/flowgraph/FlowGraph.java | 76 ++++++
.../flowgraph/FlowGraphConfigurationKeys.java | 40 ++++
.../service/modules/flowgraph/HdfsDataNode.java | 59 +++++
.../modules/flowgraph/LocalFSDataNode.java | 51 ++++
.../service/modules/template/FlowTemplate.java | 62 +++++
.../template/HOCONInputStreamFlowTemplate.java | 56 +++++
.../modules/template/JobTemplateDagFactory.java | 79 +++++++
.../modules/template/StaticFlowTemplate.java | 154 +++++++++++++
.../modules/template_catalog/FSFlowCatalog.java | 122 ++++++++++
.../FlowCatalogWithTemplates.java | 49 ++++
.../modules/flowgraph/BaseFlowGraphTest.java | 230 +++++++++++++++++++
.../service/modules/flowgraph/DagTest.java | 133 +++++++++++
.../template/JobTemplateDagFactoryTest.java | 92 ++++++++
.../template_catalog/FSFlowCatalogTest.java | 107 +++++++++
.../flowgraph/BaseFlowEdgeFactoryTest.java | 73 ++++++
.../template_catalog/test-template/flow.conf | 16 ++
.../test-template/jobs/job1.conf | 2 +
.../test-template/jobs/job2.conf | 3 +
.../test-template/jobs/job3.conf | 3 +
.../test-template/jobs/job4.conf | 3 +
37 files changed, 2537 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 2291d72..b6196da 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -214,6 +214,7 @@ public class ConfigurationKeys {
* Configuration property used only for job configuration file's tempalte, inside .template file
*/
public static final String REQUIRED_ATRRIBUTES_LIST = "gobblin.template.required_attributes";
+ public static final String JOB_DEPENDENCIES="dependencies";
/**
* Configuration for emitting job events
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
index 1269bc4..f8f3071 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
@@ -47,6 +47,12 @@ public interface JobTemplate extends Spec {
Collection<String> getRequiredConfigList() throws SpecNotFoundException, TemplateException;
/**
+ * Retrieve all job names that this job depends on. Useful for building a dag of
+ * JobTemplates.
+ */
+ Collection<String> getDependencies();
+
+ /**
* Return the combine configuration of template and user customized attributes.
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
index c370c62..6c69370 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
@@ -19,21 +19,18 @@ package org.apache.gobblin.runtime.template;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
-import java.util.Properties;
import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
-import org.apache.gobblin.runtime.api.JobCatalog;
import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -56,6 +53,8 @@ public class StaticJobTemplate extends InheritingJobTemplate {
private final String version;
@Getter
private final String description;
+ @Getter
+ private Collection<String> dependencies;
public StaticJobTemplate(URI uri, String version, String description, Config config, JobCatalogWithTemplates catalog)
throws SpecNotFoundException, TemplateException {
@@ -63,18 +62,22 @@ public class StaticJobTemplate extends InheritingJobTemplate {
}
protected StaticJobTemplate(URI uri, String version, String description, Config config, List<URI> superTemplateUris,
- JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException {
+ JobCatalogWithTemplates catalog)
+ throws SpecNotFoundException, TemplateException {
super(superTemplateUris, catalog);
this.uri = uri;
this.version = version;
this.description = description;
this.rawConfig = config;
- this.requiredAttributes = config.hasPath(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST)
- ? new HashSet<>(Arrays.asList(config.getString(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST).split(",")))
+ this.requiredAttributes = config.hasPath(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST) ? new HashSet<>(
+ Arrays.asList(config.getString(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST).split(",")))
: Sets.<String>newHashSet();
+ this.dependencies = config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays
+ .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>();
}
- private static List<URI> getSuperTemplateUris(Config config) throws TemplateException {
+ private static List<URI> getSuperTemplateUris(Config config)
+ throws TemplateException {
if (config.hasPath(SUPER_TEMPLATE_KEY)) {
List<URI> uris = Lists.newArrayList();
for (String uriString : config.getString(SUPER_TEMPLATE_KEY).split(",")) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
index bf448cb..94f02ae 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
@@ -98,6 +98,9 @@ public class PackagedTemplatesJobCatalogDecoratorTest {
public Config getResolvedConfig(Config userConfig) {
return null;
}
+
+ @Override
+ public Collection<String> getDependencies() { return null; }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
index ab76f18..c8d4183 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
@@ -177,6 +177,7 @@ public class InheritingJobTemplateTest {
private final URI uri;
private final Map<String,String> rawTemplate;
private final List<String> required;
+ private Collection<String> dependencies;
public TestTemplate(URI uri, List<URI> superTemplateUris, Map<String, String> rawTemplate, List<String> required,
JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException {
@@ -227,6 +228,11 @@ public class InheritingJobTemplateTest {
}
return userConfig.withFallback(getLocalRawTemplate());
}
+
+ @Override
+ public Collection<String> getDependencies() {
+ return this.dependencies;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
new file mode 100644
index 0000000..f7cf99f
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.Getter;
+
+
+/**
+ * An implementation of {@link HdfsDatasetDescriptor}.
+ */
+@Alpha
+public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor {
+ public static final String HDFS_PLATFORM_NAME = "hdfs";
+
+ @Getter
+ private final String path;
+ @Getter
+ private final String format;
+ @Getter
+ private final String description;
+ @Getter
+ private final String platform = HDFS_PLATFORM_NAME;
+
+ public BaseHdfsDatasetDescriptor(Config config) {
+ Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY));
+ Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY));
+
+ this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null);
+ this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null);
+ this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+ }
+
+ /**
+ * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical
+ * platform, type, path, and format.
+ * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s
+ * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns.
+ * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the
+ * two descriptors should be compatible.
+ * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}.
+ */
+ @Override
+ public boolean isCompatibleWith(DatasetDescriptor o) {
+ return this.equals(o);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o;
+ if(this.getPlatform() == null || other.getPlatform() == null) {
+ return false;
+ }
+ if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) {
+ return false;
+ }
+
+ return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ return this.toString().hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
new file mode 100644
index 0000000..4a322e6
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * The interface for dataset descriptors.
+ */
+@Alpha
+public interface DatasetDescriptor {
+ /**
+ * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.)
+ */
+ public String getPlatform();
+
+ /**
+ * @return a human-readable description of the dataset.
+ */
+ public String getDescription();
+
+ /**
+ * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the
+ * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}.
+ * This check is non-commutative.
+ */
+ public boolean isCompatibleWith(DatasetDescriptor other);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
new file mode 100644
index 0000000..6f1970c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * A descriptor interface for HDFS datasets
+ */
+@Alpha
+public interface HdfsDatasetDescriptor extends DatasetDescriptor {
+ /**
+ *
+ * @return dataset path.
+ */
+ public String getPath();
+
+ /**
+ *
+ * @return storage format of the dataset.
+ */
+ public String getFormat();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
new file mode 100644
index 0000000..731bc22
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of {@link DataNode}.
+ */
+@Alpha
+@Slf4j
+public class BaseDataNode implements DataNode {
+ @Getter
+ private String id;
+ @Getter
+ private Config props;
+ @Getter
+ private boolean active = true;
+
+ public BaseDataNode(Config nodeProps) throws DataNodeCreationException {
+ try {
+ String nodeId = ConfigUtils.getString(nodeProps, FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(nodeId), "Node Id cannot be null or empty");
+ this.id = nodeId;
+ if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) {
+ this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
+ }
+ this.props = nodeProps;
+ } catch(Exception e) {
+ throw new DataNodeCreationException(e);
+ }
+ }
+
+ /**
+ * The comparison between two nodes should involve the configuration.
+ * Node name is the identifier for the node.
+ * */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BaseDataNode that = (BaseDataNode) o;
+
+ return id.equals(that.getId());
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
new file mode 100644
index 0000000..ccce62e
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
+
+/**
+ * An implementation of {@link FlowEdge}.
+ */
+@Alpha
+public class BaseFlowEdge implements FlowEdge {
+ public static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
+
+ @Getter
+ protected List<String> endPoints;
+
+ @Getter
+ protected FlowTemplate flowTemplate;
+
+ @Getter
+ private List<SpecExecutor> executors;
+
+ @Getter
+ private Config props;
+
+ @Getter
+ private String id;
+
+ @Getter
+ private boolean active;
+
+ //Constructor
+ public BaseFlowEdge(List<String> endPoints, String edgeName, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
+ this.endPoints = endPoints;
+ this.flowTemplate = flowTemplate;
+ this.executors = executors;
+ this.active = active;
+ this.props = properties;
+ this.id = generateEdgeId(endPoints, edgeName);
+ }
+
+ @Override
+ public boolean isAccessible(UserGroupInformation user) {
+ return true;
+ }
+
+ @VisibleForTesting
+ protected static String generateEdgeId(List<String> endPoints, String edgeName) {
+ return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(endPoints.get(0), endPoints.get(1), edgeName);
+ }
+ /**
+ * The {@link FlowEdge}s are the same if they have the same endpoints and both refer to the same {@FlowTemplate} i.e.
+ * the {@link FlowTemplate} uris are the same
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlowEdge that = (FlowEdge) o;
+
+ if(!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+ return false;
+ }
+
+ if(!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return this.id;
+ }
+
+ /**
+ * A {@link FlowEdgeFactory} for creating {@link BaseFlowEdge}.
+ */
+ public static class Factory implements FlowEdgeFactory {
+
+ /**
+ * A method to return an instance of {@link BaseFlowEdge}. The method performs all the validation checks
+ * and returns
+ * @param edgeProps Properties of edge
+ * @param flowCatalog Flow Catalog used to retrieve {@link FlowTemplate}s.
+ * @return a {@link BaseFlowEdge}
+ */
+ @Override
+ public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException {
+ try {
+ String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
+ String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(destination), "A FlowEdge must have a non-null or empty destination");
+ List<String> endPoints = Lists.newArrayList(source, destination);
+ String edgeName = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY,"");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
+
+ List<Config> specExecutorConfigList = new ArrayList<>();
+ boolean flag;
+ for(int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)) != false; i++) {
+ specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
+ }
+
+ String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "");
+
+ //Perform basic validation
+ Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
+ Preconditions
+ .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
+ Preconditions
+ .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty");
+ boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
+
+ //Build SpecExecutor from config
+ List<SpecExecutor> specExecutors = new ArrayList<>();
+
+ for (Config specExecutorConfig : specExecutorConfigList) {
+ Class executorClass = Class.forName(specExecutorConfig.getString(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY));
+ SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
+ specExecutors.add(executor);
+ }
+ FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
+ return new BaseFlowEdge(endPoints, edgeName, flowTemplate, specExecutors, edgeProps, isActive);
+ } catch (Exception e) {
+ throw new FlowEdgeCreationException(e);
+ }
+ }
+
+ @Override
+ public String getEdgeId(Config edgeProps) throws IOException {
+ String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
+ String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty destination");
+ String edgeName =
+ ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
+ List<String> endPoints = Lists.newArrayList(source, destination);
+
+ return generateEdgeId(endPoints, edgeName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
new file mode 100644
index 0000000..783f7ea
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A thread-safe implementation of {@link FlowGraph}. The implementation maintains the following data structures:
+ * <p>dataNodeMap - the mapping from a node identifier to the {@link DataNode} instance</p>
+ * <p>nodesToEdges - the mapping from each {@link DataNode} to its outgoing {@link FlowEdge}s</p>
+ * <p>flowEdgeMap - the mapping from a edge label to the {@link FlowEdge} instance</p>
+ */
+@Alpha
+@Slf4j
+public class BaseFlowGraph implements FlowGraph {
+ private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
+ private Map<String, DataNode> dataNodeMap = new HashMap<>();
+ private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
+
+ /**
+ * Lookup a node by its identifier.
+ *
+ * @param nodeId node identifier
+ * @return {@link DataNode} with nodeId as the identifier.
+ */
+ public DataNode getNode(String nodeId) {
+ return this.dataNodeMap.getOrDefault(nodeId, null);
+ }
+
+ /**
+ * Add a {@link DataNode} to the {@link FlowGraph}. If the node already "exists" in the {@link FlowGraph} (i.e. the
+ * FlowGraph already has another node with the same id), we remove the old node and add the new one. The
+ * edges incident on the old node are preserved.
+ * @param node to be added to the {@link FlowGraph}
+ * @return true if node is successfully added to the {@link FlowGraph}.
+ */
+ @Override
+ public synchronized boolean addDataNode(DataNode node) {
+ //Get edges adjacent to the node if it already exists
+ Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>());
+ this.nodesToEdges.put(node, edges);
+ this.dataNodeMap.put(node.getId(), node);
+ return true;
+ }
+
+ /**
+ * Add a {@link FlowEdge} to the {@link FlowGraph}. Addition of edge succeeds only if both the end points of the
+ * edge are already nodes in the FlowGraph. If a {@link FlowEdge} already exists, the old FlowEdge is removed and
+ * the new one added in its place.
+ * @param edge
+ * @return true if addition of {@FlowEdge} is successful.
+ */
+ @Override
+ public synchronized boolean addFlowEdge(FlowEdge edge) {
+ String srcNode = edge.getEndPoints().get(0);
+ String dstNode = edge.getEndPoints().get(1);
+ if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
+ return false;
+ }
+ DataNode dataNode = getNode(srcNode);
+ if(dataNode != null) {
+ Set<FlowEdge> adjacentEdges = this.nodesToEdges.get(dataNode);
+ if(!adjacentEdges.add(edge)) {
+ adjacentEdges.remove(edge);
+ adjacentEdges.add(edge);
+ }
+ this.nodesToEdges.put(dataNode, adjacentEdges);
+ String edgeId = edge.getId();
+ this.flowEdgeMap.put(edgeId, edge);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Delete a {@link DataNode} by its identifier
+ * @param nodeId identifier of the {@link DataNode} to be deleted.
+ * @return true if {@link DataNode} is successfully deleted.
+ */
+ @Override
+ public synchronized boolean deleteDataNode(String nodeId) {
+ if(this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId))) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Delete a {@DataNode} from the {@link FlowGraph}.
+ * @param node to be deleted.
+ * @return true if {@link DataNode} is successfully deleted.
+ */
+ public synchronized boolean deleteDataNode(DataNode node) {
+ if(dataNodeMap.containsKey(node.getId())) {
+ //Delete node from dataNodeMap
+ dataNodeMap.remove(node.getId());
+
+ //Delete all the edges adjacent to the node. First, delete edges from flowEdgeMap and next, remove the edges
+ // from nodesToEdges
+ for(FlowEdge edge: nodesToEdges.get(node)) {
+ flowEdgeMap.remove(edge.getId());
+ }
+ nodesToEdges.remove(node);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Delete a {@link DataNode} by its identifier
+ * @param edgeId identifier of the {@link FlowEdge} to be deleted.
+ * @return true if {@link FlowEdge} is successfully deleted.
+ */
+ @Override
+ public synchronized boolean deleteFlowEdge(String edgeId) {
+ if(flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId))) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Delete a {@FlowEdge} from the {@link FlowGraph}.
+ * @param edge to be deleted.
+ * @return true if {@link FlowEdge} is successfully deleted. If the source of a {@link FlowEdge} does not exist or
+ * if the {@link FlowEdge} is not in the graph, return false.
+ */
+ public synchronized boolean deleteFlowEdge(FlowEdge edge) {
+ if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) {
+ return false;
+ }
+ DataNode node = dataNodeMap.get(edge.getEndPoints().get(0));
+ if(!nodesToEdges.get(node).contains(edge)) {
+ return false;
+ }
+ this.nodesToEdges.get(node).remove(edge);
+ this.flowEdgeMap.remove(edge.getId());
+ return true;
+ }
+
+ /**
+ * Get the set of edges adjacent to a {@link DataNode}
+ * @param nodeId identifier of the node
+ * @return Set of {@link FlowEdge}s adjacent to the node.
+ */
+ @Override
+ public Set<FlowEdge> getEdges(String nodeId) {
+ DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null);
+ return getEdges(dataNode);
+ }
+
+ /**
+ * Get the set of edges adjacent to a {@link DataNode}
+ * @param node {@link DataNode}
+ * @return Set of {@link FlowEdge}s adjacent to the node.
+ */
+ @Override
+ public Set<FlowEdge> getEdges(DataNode node) {
+ return (node != null)? this.nodesToEdges.getOrDefault(node, null) : null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
new file mode 100644
index 0000000..8ae0027
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import lombok.Getter;
+
+
+/**
+ * An implementation of Dag. Assumes that nodes have unique values. Nodes with duplicate values will produce
+ * unpredictable behavior.
+ */
+@Alpha
+@Getter
+public class Dag<T> {
+ private List<DagNode<T>> startNodes;
+ private List<DagNode<T>> endNodes;
+ // Map to maintain parent to children mapping.
+ private Map<DagNode, List<DagNode<T>>> parentChildMap;
+ private List<DagNode<T>> nodes;
+
+ public Dag(List<DagNode<T>> dagNodes) {
+ this.nodes = dagNodes;
+ //Build dag
+ this.build();
+ }
+
+ /**
+ * Constructs the dag from the Node list.
+ */
+ private void build() {
+ this.startNodes = new ArrayList<>();
+ this.endNodes = new ArrayList<>();
+ this.parentChildMap = new HashMap<>();
+ for (DagNode node : this.nodes) {
+ //If a Node has no parent Node, add it to the list of start Nodes
+ if (node.getParentNodes() == null) {
+ this.startNodes.add(node);
+ } else {
+ List<DagNode> parentNodeList = node.getParentNodes();
+ for (DagNode parentNode : parentNodeList) {
+ if (parentChildMap.containsKey(parentNode)) {
+ parentChildMap.get(parentNode).add(node);
+ } else {
+ parentChildMap.put(parentNode, Lists.newArrayList(node));
+ }
+ }
+ }
+ }
+ //Iterate over all the Nodes and add a Node to the list of endNodes if it is not present in the parentChildMap
+ for (DagNode node : this.nodes) {
+ if (!parentChildMap.containsKey(node)) {
+ this.endNodes.add(node);
+ }
+ }
+ }
+
+ public List<DagNode<T>> getChildren(DagNode node) {
+ return parentChildMap.getOrDefault(node, null);
+ }
+
+ public List<DagNode<T>> getParents(DagNode node) {
+ return node.parentNodes;
+ }
+
+ /**
+ * Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag.
+ * The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes)
+ * are completed before starting any job of the "other" dag. This is done by adding each endNode of this dag as
+ * a parent of every startNode of the other dag.
+ *
+ * @param other dag to concatenate to this dag
+ * @return the concatenated dag
+ */
+ public Dag<T> concatenate(Dag<T> other) throws IOException {
+ if (other == null) {
+ return this;
+ }
+ for (DagNode node : this.endNodes) {
+ this.parentChildMap.put(node, Lists.newArrayList());
+ for (DagNode otherNode : other.startNodes) {
+ this.parentChildMap.get(node).add(otherNode);
+ otherNode.addParentNode(node);
+ }
+ this.endNodes = other.endNodes;
+ }
+ return this;
+ }
+
+ @Getter
+ public static class DagNode<T> {
+ private T value;
+ //List of parent Nodes that are dependencies of this Node.
+ private List<DagNode<T>> parentNodes;
+
+ //Constructor
+ public DagNode(T value) {
+ this.value = value;
+ }
+
+ public void addParentNode(DagNode<T> node) {
+ if (parentNodes == null) {
+ parentNodes = Lists.newArrayList(node);
+ return;
+ }
+ parentNodes.add(node);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DagNode that = (DagNode) o;
+ if (!this.getValue().equals(that.getValue())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getValue().hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
new file mode 100644
index 0000000..4931685
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+
+
+/**
+ * Representation of a node in the FlowGraph. Each node is identified by a unique identifier.
+ */
+@Alpha
+public interface DataNode {
+ /**
+ * @return the identifier of a {@link DataNode}.
+ */
+ String getId();
+
+ /**
+ * @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}
+ * e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node.
+ */
+ Config getProps();
+
+ /**
+ * @return true if the {@link DataNode} is active
+ */
+ boolean isActive();
+
+ class DataNodeCreationException extends Exception {
+ private static final String MESSAGE_FORMAT = "Failed to create DataNode because of: %s";
+
+ public DataNodeCreationException(Exception e) {
+ super(String.format(MESSAGE_FORMAT, e.getMessage()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
new file mode 100644
index 0000000..e98337d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+/**
+ * Config keys related to {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}.
+ */
+public class DatasetDescriptorConfigKeys {
+ //Gobblin Service Dataset Descriptor related keys
+ public static final String PATH_KEY = "path";
+ public static final String FORMAT_KEY = "format";
+ public static final String DESCRIPTION_KEY = "description";
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
new file mode 100644
index 0000000..5899645
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
+
+/**
+ * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
+ * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
+ */
+@Alpha
+public abstract class FileSystemDataNode extends BaseDataNode {
+ public static final String FS_URI_KEY = "fs.uri";
+ @Getter
+ private String fsUri;
+
+ /**
+ * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
+ */
+ public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ try {
+ this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode");
+ URI uri = new URI(this.fsUri);
+ if(!isUriValid(uri)) {
+ throw new IOException("Invalid FS URI " + this.fsUri);
+ }
+ } catch(Exception e) {
+ throw new DataNodeCreationException(e);
+ }
+ }
+
+ public abstract boolean isUriValid(URI fsUri);
+ /**
+ * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FileSystemDataNode that = (FileSystemDataNode) o;
+
+ return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri());
+ }
+
+ @Override
+ public int hashCode() {
+ return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
new file mode 100644
index 0000000..fb60d67
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.List;
+
+import com.typesafe.config.Config;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+/**
+ * Representation of an edge in a FlowGraph. Each {@link FlowEdge} encapsulates:
+ * <p><ul>
+ * <li> two {@link DataNode}s as its end points
+ * <li>a {@FlowTemplate} that responsible for data movement between the {@DataNode}s.
+ * <li> a list of {@link SpecExecutor}s where the {@link FlowTemplate} can be executed.
+ * </ul></p> and
+ *
+ */
+@Alpha
+public interface FlowEdge {
+ /**
+ *
+ * @return the {@link DataNode} ids that are the end points of the edge.
+ */
+ List<String> getEndPoints();
+
+ /**
+ *
+ * @return the {@link FlowTemplate} that performs the data movement along the edge.
+ */
+ FlowTemplate getFlowTemplate();
+
+ /**
+ *
+ * @return a list of {@link SpecExecutor}s that can execute the {@link FlowTemplate} corresponding to this edge.
+ */
+ List<SpecExecutor> getExecutors();
+
+ /**
+ * Get the properties that defines the {@link FlowEdge}. Encapsulates all the properties from which the {@link FlowEdge}
+ * is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}.
+ * @return the properties of this edge as a {@link Config} object.
+ */
+ Config getProps();
+
+ /**
+ * A string uniquely identifying the edge.
+ * @return the label of the {@link FlowEdge}.
+ */
+ String getId();
+
+ /**
+ *
+ * @return true if the {@link FlowEdge} is active.
+ */
+ boolean isActive();
+
+ /**
+ *
+ * @param user
+ * @return true if the user has ACL permissions to access the {@link FlowEdge},
+ */
+ boolean isAccessible(UserGroupInformation user);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
new file mode 100644
index 0000000..851e887
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+import com.typesafe.config.Config;
+
+
+public interface FlowEdgeFactory {
+ /**
+ * Construct a {@link FlowEdge} from the edge properties
+ * @param edgeProps properties of the {@link FlowEdge}
+ * @param catalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
+ * useful for creating a {@link FlowEdge}.
+ * @return an instance of {@link FlowEdge}
+ * @throws FlowEdgeCreationException
+ */
+ public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException;
+
+ /**
+ * Get an edge label from the edge properties
+ * @param edgeProps properties of the edge
+ * @return a string label identifying the edge
+ */
+ public String getEdgeId(Config edgeProps) throws IOException;
+
+ public class FlowEdgeCreationException extends Exception {
+ private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";
+
+ public FlowEdgeCreationException(Exception e) {
+ super(String.format(MESSAGE_FORMAT, e.getMessage()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
new file mode 100644
index 0000000..23f5793
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.Collection;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * An interface for {@link FlowGraph}. A {@link FlowGraph} consists of {@link DataNode}s and {@link FlowEdge}s.
+ * The interface provides methods for adding and removing {@link DataNode}s and {@link FlowEdge}s to the {@link FlowGraph}.
+ * In addition the interface provides methods to return factory classes for creation of {@link DataNode}s and {@link FlowEdge}s.
+ */
+
+@Alpha
+public interface FlowGraph {
+
+ /**
+ * Add a {@link DataNode} to the {@link FlowGraph}
+ * @param node {@link DataNode} to be added
+ * @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.
+ */
+ public boolean addDataNode(DataNode node);
+
+ /**
+ * Add a {@link FlowEdge} to the {@link FlowGraph}
+ * @param edge {@link FlowEdge} to be added
+ * @return true if {@link FlowEdge} is added to the {@link FlowGraph} successfully.
+ */
+ public boolean addFlowEdge(FlowEdge edge);
+
+ /**
+ * Remove a {@link DataNode} and all its incident edges from the {@link FlowGraph}
+ * @param nodeId identifier of the {@link DataNode} to be removed
+ * @return true if {@link DataNode} is removed from the {@link FlowGraph} successfully.
+ */
+ public boolean deleteDataNode(String nodeId);
+
+ /**
+ * Remove a {@link FlowEdge} from the {@link FlowGraph}
+ * @param edgeId label of the edge to be removed
+ * @return true if edge is removed from the {@link FlowGraph} successfully.
+ */
+ public boolean deleteFlowEdge(String edgeId);
+
+ /**
+ * Get a collection of edges adjacent to a {@link DataNode}. Useful for path finding algorithms and graph
+ * traversal algorithms such as Djikstra's shortest-path algorithm, BFS
+ * @param nodeId identifier of the {@link DataNode}
+ * @return a collection of edges adjacent to the {@link DataNode}
+ */
+ public Collection<FlowEdge> getEdges(String nodeId);
+
+ /**
+ * Get a collection of edges adjacent to a {@link DataNode}.
+ * @param node {@link DataNode}
+ * @return a collection of edges adjacent to the {@link DataNode}
+ */
+ public Collection<FlowEdge> getEdges(DataNode node);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
new file mode 100644
index 0000000..0d94e3f
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+public class FlowGraphConfigurationKeys {
+ public static final String DATA_NODE_PREFIX = "data.node.";
+ public static final String FLOW_EDGE_PREFIX = "flow.edge.";
+
+ /**
+ * {@link DataNode} configuration keys.
+ */
+ public static final String DATA_NODE_ID_KEY = DATA_NODE_PREFIX + "id";
+ public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
+
+ /**
+ * {@link FlowEdge} configuration keys.
+ */
+ public static final String FLOW_EDGE_SOURCE_KEY = FLOW_EDGE_PREFIX + "source";
+ public static final String FLOW_EDGE_DESTINATION_KEY = FLOW_EDGE_PREFIX + "destination";
+ public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
+ public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
+ public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
+ public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX +"specExecutors";
+ public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
new file mode 100644
index 0000000..7bcc18d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import joptsimple.internal.Strings;
+
+
+/**
+ * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class HdfsDataNode extends FileSystemDataNode {
+ public static final String HDFS_SCHEME = "hdfs";
+
+ public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ }
+
+ /**
+ *
+ * @param fsUri FileSystem URI
+ * @return true if the scheme is "hdfs" and authority is not empty.
+ */
+ @Override
+ public boolean isUriValid(URI fsUri) {
+ String scheme = fsUri.getScheme();
+ //Check that the scheme is "hdfs"
+ if(!scheme.equals(HDFS_SCHEME)) {
+ return false;
+ }
+ //Ensure that the authority is not empty
+ if(Strings.isNullOrEmpty(fsUri.getAuthority())) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
new file mode 100644
index 0000000..6dc1aa3
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import com.typesafe.config.Config;
+
+/**
+ * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class LocalFSDataNode extends FileSystemDataNode {
+ public static final String LOCAL_FS_SCHEME = "file";
+
+ public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ }
+
+ /**
+ *
+ * @param fsUri FileSystem URI
+ * @return true if the scheme of fsUri equals "file"
+ */
+ @Override
+ public boolean isUriValid(URI fsUri) {
+ String scheme = fsUri.getScheme();
+ if(scheme.equals(LOCAL_FS_SCHEME)) {
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
new file mode 100644
index 0000000..30d8309
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+
+/**
+ * An interface primarily for representing a flow of {@link JobTemplate}s. It also has
+ * method for retrieving required configs for every {@link JobTemplate} in the flow.
+ */
+@Alpha
+public interface FlowTemplate extends Spec {
+
+ /**
+ * @return the {@link Collection} of {@link JobTemplate}s that belong to this {@link FlowTemplate}.
+ */
+ List<JobTemplate> getJobTemplates();
+
+ /**
+ *
+ * @return the {@link Dag<JobTemplate>} that backs the {@link FlowTemplate}.
+ */
+ Dag<JobTemplate> getDag() throws IOException;
+
+ /**
+ *
+ * @return all configuration inside pre-written template.
+ */
+ Config getRawTemplateConfig();
+
+ /**
+ * @return list of input/output {@link DatasetDescriptor}s for the {@link FlowTemplate}.
+ */
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> getInputOutputDatasetDescriptors();
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
new file mode 100644
index 0000000..3847fd2
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
+
+/**
+ * A {@link FlowTemplate} that loads a HOCON file as a {@link StaticFlowTemplate}.
+ */
+@Alpha
+public class HOCONInputStreamFlowTemplate extends StaticFlowTemplate {
+ public static final String VERSION_KEY = "gobblin.flow.template.version";
+ public static final String DEFAULT_VERSION = "1";
+
+ public HOCONInputStreamFlowTemplate(InputStream inputStream, URI uri, FlowCatalogWithTemplates catalog)
+ throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
+ this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)).resolve(
+ ConfigResolveOptions.defaults().setAllowUnresolved(true)), uri, catalog);
+ }
+
+ public HOCONInputStreamFlowTemplate(Config config, URI uri, FlowCatalogWithTemplates catalog)
+ throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
+ super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
+ config.hasPath(ConfigurationKeys.FLOW_DESCRIPTION_KEY) ? config
+ .getString(ConfigurationKeys.FLOW_DESCRIPTION_KEY) : "", config, catalog);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
new file mode 100644
index 0000000..b89da4d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A Factory class used for constructing a {@link Dag} of {@link org.apache.gobblin.runtime.api.JobTemplate}s from
+ * a {@link URI} of a {@link FlowTemplate}.
+ */
+@Alpha
+@Slf4j
+public class JobTemplateDagFactory {
+ public static final String JOB_TEMPLATE_FILE_SUFFIX = ".conf";
+
+ public static Dag<JobTemplate> createDagFromJobTemplates(List<JobTemplate> jobTemplates) {
+ Map<URI, Dag.DagNode<JobTemplate>> uriJobTemplateMap = new HashMap<>();
+ List<Dag.DagNode<JobTemplate>> dagNodeList = new ArrayList<>();
+ /**
+ * Create a {@link Dag.DagNode<JobTemplate>} for every {@link JobTemplate} in the flow. Add this node
+ * to a {@link Map<URI,JobTemplate>}.
+ */
+ for (JobTemplate template : jobTemplates) {
+ Dag.DagNode<JobTemplate> dagNode = new Dag.DagNode<>(template);
+ dagNodeList.add(dagNode);
+ uriJobTemplateMap.put(template.getUri(), dagNode);
+ }
+
+ /**
+ * Iterate over each {@link JobTemplate} to get the dependencies of each {@link JobTemplate}.
+ * For each {@link JobTemplate}, get the corresponding {@link Dag.DagNode} and
+ * set the {@link Dag.DagNode}s corresponding to the dependencies as its parent nodes.
+ *
+ * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
+ */
+ Path templateDirPath = new Path(jobTemplates.get(0).getUri()).getParent();
+ for (JobTemplate template : jobTemplates) {
+ URI templateUri = template.getUri();
+ Dag.DagNode<JobTemplate> node = uriJobTemplateMap.get(templateUri);
+ Collection<String> dependencies = template.getDependencies();
+ for (String dependency : dependencies) {
+ URI dependencyUri = new Path(templateDirPath, dependency).suffix(JOB_TEMPLATE_FILE_SUFFIX).toUri();
+ Dag.DagNode<JobTemplate> parentNode = uriJobTemplateMap.get(dependencyUri);
+ node.addParentNode(parentNode);
+ }
+ }
+ Dag<JobTemplate> dag = new Dag<>(dagNodeList);
+ return dag;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
new file mode 100644
index 0000000..8347022
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.Path;
+
+import lombok.Getter;
+
+
+/**
+ * A {@link FlowTemplate} using a static {@link Config} as the raw configuration for the template.
+ */
+@Alpha
+public class StaticFlowTemplate implements FlowTemplate {
+ public static final String INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.input";
+ public static final String OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.output";
+ public static final String DATASET_DESCRIPTOR_CLASS_KEY = "class";
+
+ @Getter
+ private URI uri;
+ @Getter
+ private String version;
+ @Getter
+ private String description;
+ @Getter
+ private FlowCatalogWithTemplates catalog;
+ @Getter
+ private List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors;
+ @Getter
+ private List<JobTemplate> jobTemplates;
+
+ private Dag<JobTemplate> dag;
+
+ private Config rawConfig;
+ private boolean isTemplateMaterialized;
+
+ public StaticFlowTemplate(URI uri, String version, String description, Config config,
+ FlowCatalogWithTemplates catalog)
+ throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
+ this.uri = uri;
+ this.version = version;
+ this.description = description;
+ this.inputOutputDatasetDescriptors = buildInputOutputDescriptors(config);
+ this.rawConfig = config;
+ this.catalog = catalog;
+ URI flowTemplateDir = new Path(this.uri).getParent().toUri();
+ this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDir);
+ }
+
+ //Constructor for testing purposes
+ public StaticFlowTemplate(URI uri, String version, String description, Config config,
+ FlowCatalogWithTemplates catalog, List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors, List<JobTemplate> jobTemplates)
+ throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
+ this.uri = uri;
+ this.version = version;
+ this.description = description;
+ this.inputOutputDatasetDescriptors = inputOutputDatasetDescriptors;
+ this.rawConfig = config;
+ this.catalog = catalog;
+ this.jobTemplates = jobTemplates;
+ }
+
+ /**
+ * Generate the input/output dataset descriptors for the {@link FlowTemplate}.
+ */
+ private List<Pair<DatasetDescriptor, DatasetDescriptor>> buildInputOutputDescriptors(Config config)
+ throws IOException, ReflectiveOperationException {
+ if (!config.hasPath(INPUT_DATASET_DESCRIPTOR_PREFIX) || !config.hasPath(OUTPUT_DATASET_DESCRIPTOR_PREFIX)) {
+ throw new IOException("Flow template must specify at least one input/output dataset descriptor");
+ }
+ int i = 0;
+ String inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> result = Lists.newArrayList();
+ while (config.hasPath(inputPrefix)) {
+ Config inputDescriptorConfig = config.getConfig(inputPrefix);
+ DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig);
+ String outputPrefix = Joiner.on(".").join(OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i++));
+ Config outputDescriptorConfig = config.getConfig(outputPrefix);
+ DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig);
+ result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
+ inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+ }
+ return result;
+ }
+
+ private DatasetDescriptor getDatasetDescriptor(Config descriptorConfig)
+ throws ReflectiveOperationException {
+ Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DATASET_DESCRIPTOR_CLASS_KEY));
+ return (DatasetDescriptor) GobblinConstructorUtils
+ .invokeLongestConstructor(datasetDescriptorClass, descriptorConfig);
+ }
+
+ @Override
+ public Config getRawTemplateConfig() {
+ return this.rawConfig;
+ }
+
+ private void ensureTemplateMaterialized()
+ throws IOException {
+ try {
+ if (!isTemplateMaterialized) {
+ this.dag = JobTemplateDagFactory.createDagFromJobTemplates(this.jobTemplates);
+ }
+ this.isTemplateMaterialized = true;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List<JobTemplate> getJobTemplates() {
+ return this.jobTemplates;
+ }
+
+ @Override
+ public Dag<JobTemplate> getDag()
+ throws IOException {
+ ensureTemplateMaterialized();
+ return this.dag;
+ }
+}