You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/06/05 04:05:55 UTC

[2/2] incubator-gobblin git commit: [GOBBLIN-491] Create a FlowGraph representation for multi-hop support in Gobblin-as-a-Service.[]

[GOBBLIN-491] Create a FlowGraph representation for multi-hop support in Gobblin-as-a-Service.[]

Closes #2361 from sv2000/flowGraph


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9b91fa1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9b91fa1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9b91fa1b

Branch: refs/heads/master
Commit: 9b91fa1b3bc7ec62aa3a4192dffd3fef63dfea94
Parents: 8949aa3
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jun 4 21:05:50 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Jun 4 21:05:50 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   1 +
 .../apache/gobblin/runtime/api/JobTemplate.java |   6 +
 .../runtime/template/StaticJobTemplate.java     |  19 +-
 ...ackagedTemplatesJobCatalogDecoratorTest.java |   3 +
 .../template/InheritingJobTemplateTest.java     |   6 +
 .../dataset/BaseHdfsDatasetDescriptor.java      |  99 ++++++++
 .../modules/dataset/DatasetDescriptor.java      |  44 ++++
 .../modules/dataset/HdfsDatasetDescriptor.java  |  40 ++++
 .../service/modules/flowgraph/BaseDataNode.java |  81 +++++++
 .../service/modules/flowgraph/BaseFlowEdge.java | 191 +++++++++++++++
 .../modules/flowgraph/BaseFlowGraph.java        | 189 +++++++++++++++
 .../gobblin/service/modules/flowgraph/Dag.java  | 153 ++++++++++++
 .../service/modules/flowgraph/DataNode.java     |  54 +++++
 .../flowgraph/DatasetDescriptorConfigKeys.java  |  28 +++
 .../modules/flowgraph/FileSystemDataNode.java   |  83 +++++++
 .../service/modules/flowgraph/FlowEdge.java     |  85 +++++++
 .../modules/flowgraph/FlowEdgeFactory.java      |  53 +++++
 .../service/modules/flowgraph/FlowGraph.java    |  76 ++++++
 .../flowgraph/FlowGraphConfigurationKeys.java   |  40 ++++
 .../service/modules/flowgraph/HdfsDataNode.java |  59 +++++
 .../modules/flowgraph/LocalFSDataNode.java      |  51 ++++
 .../service/modules/template/FlowTemplate.java  |  62 +++++
 .../template/HOCONInputStreamFlowTemplate.java  |  56 +++++
 .../modules/template/JobTemplateDagFactory.java |  79 +++++++
 .../modules/template/StaticFlowTemplate.java    | 154 +++++++++++++
 .../modules/template_catalog/FSFlowCatalog.java | 122 ++++++++++
 .../FlowCatalogWithTemplates.java               |  49 ++++
 .../modules/flowgraph/BaseFlowGraphTest.java    | 230 +++++++++++++++++++
 .../service/modules/flowgraph/DagTest.java      | 133 +++++++++++
 .../template/JobTemplateDagFactoryTest.java     |  92 ++++++++
 .../template_catalog/FSFlowCatalogTest.java     | 107 +++++++++
 .../flowgraph/BaseFlowEdgeFactoryTest.java      |  73 ++++++
 .../template_catalog/test-template/flow.conf    |  16 ++
 .../test-template/jobs/job1.conf                |   2 +
 .../test-template/jobs/job2.conf                |   3 +
 .../test-template/jobs/job3.conf                |   3 +
 .../test-template/jobs/job4.conf                |   3 +
 37 files changed, 2537 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 2291d72..b6196da 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -214,6 +214,7 @@ public class ConfigurationKeys {
    * Configuration property used only for job configuration file's tempalte, inside .template file
    */
   public static final String REQUIRED_ATRRIBUTES_LIST = "gobblin.template.required_attributes";
+  public static final String JOB_DEPENDENCIES="dependencies";
 
   /**
    * Configuration for emitting job events

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
index 1269bc4..f8f3071 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
@@ -47,6 +47,12 @@ public interface JobTemplate extends Spec {
   Collection<String> getRequiredConfigList() throws SpecNotFoundException, TemplateException;
 
   /**
+   * Retrieve all job names that this job depends on. Useful for building a dag of
+   * JobTemplates.
+   */
+  Collection<String> getDependencies();
+
+  /**
    * Return the combine configuration of template and user customized attributes.
    * @return
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
index c370c62..6c69370 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
@@ -19,21 +19,18 @@ package org.apache.gobblin.runtime.template;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
-import org.apache.gobblin.runtime.api.JobCatalog;
 import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -56,6 +53,8 @@ public class StaticJobTemplate extends InheritingJobTemplate {
   private final String version;
   @Getter
   private final String description;
+  @Getter
+  private Collection<String> dependencies;
 
   public StaticJobTemplate(URI uri, String version, String description, Config config, JobCatalogWithTemplates catalog)
       throws SpecNotFoundException, TemplateException {
@@ -63,18 +62,22 @@ public class StaticJobTemplate extends InheritingJobTemplate {
   }
 
   protected StaticJobTemplate(URI uri, String version, String description, Config config, List<URI> superTemplateUris,
-      JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException {
+      JobCatalogWithTemplates catalog)
+      throws SpecNotFoundException, TemplateException {
     super(superTemplateUris, catalog);
     this.uri = uri;
     this.version = version;
     this.description = description;
     this.rawConfig = config;
-    this.requiredAttributes = config.hasPath(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST)
-        ? new HashSet<>(Arrays.asList(config.getString(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST).split(",")))
+    this.requiredAttributes = config.hasPath(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST) ? new HashSet<>(
+        Arrays.asList(config.getString(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST).split(",")))
         : Sets.<String>newHashSet();
+    this.dependencies = config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays
+        .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>();
   }
 
-  private static List<URI> getSuperTemplateUris(Config config) throws TemplateException {
+  private static List<URI> getSuperTemplateUris(Config config)
+      throws TemplateException {
     if (config.hasPath(SUPER_TEMPLATE_KEY)) {
       List<URI> uris = Lists.newArrayList();
       for (String uriString : config.getString(SUPER_TEMPLATE_KEY).split(",")) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
index bf448cb..94f02ae 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java
@@ -98,6 +98,9 @@ public class PackagedTemplatesJobCatalogDecoratorTest {
     public Config getResolvedConfig(Config userConfig) {
       return null;
     }
+
+    @Override
+    public Collection<String> getDependencies() { return null; }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
index ab76f18..c8d4183 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java
@@ -177,6 +177,7 @@ public class InheritingJobTemplateTest {
     private final URI uri;
     private final Map<String,String> rawTemplate;
     private final List<String> required;
+    private Collection<String> dependencies;
 
     public TestTemplate(URI uri, List<URI> superTemplateUris, Map<String, String> rawTemplate, List<String> required,
         JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException {
@@ -227,6 +228,11 @@ public class InheritingJobTemplateTest {
       }
       return userConfig.withFallback(getLocalRawTemplate());
     }
+
+    @Override
+    public Collection<String> getDependencies() {
+      return this.dependencies;
+    }
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
new file mode 100644
index 0000000..f7cf99f
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.Getter;
+
+
+/**
+ * An implementation of {@link HdfsDatasetDescriptor}.
+ */
+@Alpha
+public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor {
+  public static final String HDFS_PLATFORM_NAME = "hdfs";
+
+  @Getter
+  private final String path;
+  @Getter
+  private final String format;
+  @Getter
+  private final String description;
+  @Getter
+  private final String platform = HDFS_PLATFORM_NAME;
+
+  public BaseHdfsDatasetDescriptor(Config config) {
+    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY));
+    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY));
+
+    this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null);
+    this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null);
+    this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+  }
+
+  /**
+   * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical
+   * platform, type, path, and format.
+   * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s
+   * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns.
+   * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the
+   * two descriptors should be compatible.
+   * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}.
+   */
+  @Override
+  public boolean isCompatibleWith(DatasetDescriptor o) {
+    return this.equals(o);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o;
+    if(this.getPlatform() == null || other.getPlatform() == null) {
+      return false;
+    }
+    if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) {
+      return false;
+    }
+
+    return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat());
+  }
+
+  @Override
+  public String toString() {
+     return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    return this.toString().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
new file mode 100644
index 0000000..4a322e6
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * The interface for dataset descriptors.
+ */
+@Alpha
+public interface DatasetDescriptor {
+  /**
+   * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.)
+   */
+  public String getPlatform();
+
+  /**
+   * @return a human-readable description of the dataset.
+   */
+  public String getDescription();
+
+  /**
+   * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the
+   * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}.
+   * This check is non-commutative.
+   */
+  public boolean isCompatibleWith(DatasetDescriptor other);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
new file mode 100644
index 0000000..6f1970c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * A descriptor interface for HDFS datasets
+ */
+@Alpha
+public interface HdfsDatasetDescriptor extends DatasetDescriptor {
+  /**
+   *
+   * @return dataset path.
+   */
+  public String getPath();
+
+  /**
+   *
+   * @return storage format of the dataset.
+   */
+  public String getFormat();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
new file mode 100644
index 0000000..731bc22
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of {@link DataNode}.
+ */
+@Alpha
+@Slf4j
+public class BaseDataNode implements DataNode {
+  @Getter
+  private String id;
+  @Getter
+  private Config props;
+  @Getter
+  private boolean active = true;
+
+  public BaseDataNode(Config nodeProps) throws DataNodeCreationException {
+    try {
+      String nodeId = ConfigUtils.getString(nodeProps, FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, "");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(nodeId), "Node Id cannot be null or empty");
+      this.id = nodeId;
+      if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) {
+        this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
+      }
+      this.props = nodeProps;
+    } catch(Exception e) {
+      throw new DataNodeCreationException(e);
+    }
+  }
+
+  /**
+   * The comparison between two nodes should involve the configuration.
+   * Node name is the identifier for the node.
+   * */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    BaseDataNode that = (BaseDataNode) o;
+
+    return id.equals(that.getId());
+  }
+
+  @Override
+  public int hashCode() {
+    return this.id.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
new file mode 100644
index 0000000..ccce62e
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
+
+/**
+ * An implementation of {@link FlowEdge}.
+ */
+@Alpha
+public class BaseFlowEdge implements FlowEdge {
+  public static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
+
+  @Getter
+  protected List<String> endPoints;
+
+  @Getter
+  protected FlowTemplate flowTemplate;
+
+  @Getter
+  private List<SpecExecutor> executors;
+
+  @Getter
+  private Config props;
+
+  @Getter
+  private String id;
+
+  @Getter
+  private boolean active;
+
+  //Constructor
+  public BaseFlowEdge(List<String> endPoints, String edgeName, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
+    this.endPoints = endPoints;
+    this.flowTemplate = flowTemplate;
+    this.executors = executors;
+    this.active = active;
+    this.props = properties;
+    this.id = generateEdgeId(endPoints, edgeName);
+  }
+
+  @Override
+  public boolean isAccessible(UserGroupInformation user) {
+    return true;
+  }
+
+  @VisibleForTesting
+  protected static String generateEdgeId(List<String> endPoints, String edgeName) {
+    return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(endPoints.get(0), endPoints.get(1), edgeName);
+  }
+  /**
+   *   The {@link FlowEdge}s are the same if they have the same endpoints and both refer to the same {@FlowTemplate} i.e.
+   *   the {@link FlowTemplate} uris are the same
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FlowEdge that = (FlowEdge) o;
+
+    if(!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+      return false;
+    }
+
+    if(!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return this.id.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return this.id;
+  }
+
+  /**
+   * A {@link FlowEdgeFactory} for creating {@link BaseFlowEdge}.
+   */
+  public static class Factory implements FlowEdgeFactory {
+
+    /**
+     * A method to return an instance of {@link BaseFlowEdge}. The method performs all the validation checks
+     * and returns
+     * @param edgeProps Properties of edge
+     * @param flowCatalog Flow Catalog used to retrieve {@link FlowTemplate}s.
+     * @return a {@link BaseFlowEdge}
+     */
+    @Override
+    public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException {
+      try {
+        String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
+        String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(destination), "A FlowEdge must have a non-null or empty destination");
+        List<String> endPoints = Lists.newArrayList(source, destination);
+        String edgeName = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY,"");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
+
+        List<Config> specExecutorConfigList = new ArrayList<>();
+        boolean flag;
+        for(int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)) != false; i++) {
+          specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
+        }
+
+        String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "");
+
+        //Perform basic validation
+        Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
+        Preconditions
+            .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
+        Preconditions
+            .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty");
+        boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
+
+        //Build SpecExecutor from config
+        List<SpecExecutor> specExecutors = new ArrayList<>();
+
+        for (Config specExecutorConfig : specExecutorConfigList) {
+          Class executorClass = Class.forName(specExecutorConfig.getString(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY));
+          SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
+          specExecutors.add(executor);
+        }
+        FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
+        return new BaseFlowEdge(endPoints, edgeName, flowTemplate, specExecutors, edgeProps, isActive);
+      } catch (Exception e) {
+        throw new FlowEdgeCreationException(e);
+      }
+    }
+
+    @Override
+    public String getEdgeId(Config edgeProps) throws IOException {
+      String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
+      String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty destination");
+      String edgeName =
+          ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
+      List<String> endPoints = Lists.newArrayList(source, destination);
+
+      return generateEdgeId(endPoints, edgeName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
new file mode 100644
index 0000000..783f7ea
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A thread-safe implementation of {@link FlowGraph}. The implementation maintains the following data structures:
+ *   <p>dataNodeMap - the mapping from a node identifier to the {@link DataNode} instance</p>
+ *   <p>nodesToEdges - the mapping from each {@link DataNode} to its outgoing {@link FlowEdge}s</p>
+ *   <p>flowEdgeMap - the mapping from a edge label to the {@link FlowEdge} instance</p>
+ */
+@Alpha
+@Slf4j
+public class BaseFlowGraph implements FlowGraph {
+  private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
+  private Map<String, DataNode> dataNodeMap = new HashMap<>();
+  private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
+
+  /**
+   * Lookup a node by its identifier.
+   *
+   * @param nodeId node identifier
+   * @return {@link DataNode} with nodeId as the identifier.
+   */
+  public DataNode getNode(String nodeId) {
+    return this.dataNodeMap.getOrDefault(nodeId, null);
+  }
+
+  /**
+   * Add a {@link DataNode} to the {@link FlowGraph}. If the node already "exists" in the {@link FlowGraph} (i.e. the
+   * FlowGraph already has another node with the same id), we remove the old node and add the new one. The
+   * edges incident on the old node are preserved.
+   * @param node to be added to the {@link FlowGraph}
+   * @return true if node is successfully added to the {@link FlowGraph}.
+   */
+  @Override
+  public synchronized boolean addDataNode(DataNode node) {
+    //Get edges adjacent to the node if it already exists
+    Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>());
+    this.nodesToEdges.put(node, edges);
+    this.dataNodeMap.put(node.getId(), node);
+    return true;
+  }
+
+  /**
+   * Add a {@link FlowEdge} to the {@link FlowGraph}. Addition of edge succeeds only if both the end points of the
+   * edge are already nodes in the FlowGraph. If a {@link FlowEdge} already exists, the old FlowEdge is removed and
+   * the new one added in its place.
+   * @param edge
+   * @return true if addition of {@FlowEdge} is successful.
+   */
+  @Override
+  public synchronized boolean addFlowEdge(FlowEdge edge) {
+    String srcNode = edge.getEndPoints().get(0);
+    String dstNode = edge.getEndPoints().get(1);
+    if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
+      return false;
+    }
+    DataNode dataNode = getNode(srcNode);
+    if(dataNode != null) {
+      Set<FlowEdge> adjacentEdges = this.nodesToEdges.get(dataNode);
+      if(!adjacentEdges.add(edge)) {
+        adjacentEdges.remove(edge);
+        adjacentEdges.add(edge);
+      }
+      this.nodesToEdges.put(dataNode, adjacentEdges);
+      String edgeId = edge.getId();
+      this.flowEdgeMap.put(edgeId, edge);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Delete a {@link DataNode} by its identifier
+   * @param nodeId identifier of the {@link DataNode} to be deleted.
+   * @return true if {@link DataNode} is successfully deleted.
+   */
+  @Override
+  public synchronized boolean deleteDataNode(String nodeId) {
+    if(this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId))) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Delete a {@DataNode} from the {@link FlowGraph}.
+   * @param node to be deleted.
+   * @return true if {@link DataNode} is successfully deleted.
+   */
+  public synchronized boolean deleteDataNode(DataNode node) {
+    if(dataNodeMap.containsKey(node.getId())) {
+      //Delete node from dataNodeMap
+      dataNodeMap.remove(node.getId());
+
+      //Delete all the edges adjacent to the node. First, delete edges from flowEdgeMap and next, remove the edges
+      // from nodesToEdges
+      for(FlowEdge edge: nodesToEdges.get(node)) {
+        flowEdgeMap.remove(edge.getId());
+      }
+      nodesToEdges.remove(node);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Delete a {@link DataNode} by its identifier
+   * @param edgeId identifier of the {@link FlowEdge} to be deleted.
+   * @return true if {@link FlowEdge} is successfully deleted.
+   */
+  @Override
+  public synchronized boolean deleteFlowEdge(String edgeId) {
+    if(flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId))) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Delete a {@FlowEdge} from the {@link FlowGraph}.
+   * @param edge to be deleted.
+   * @return true if {@link FlowEdge} is successfully deleted. If the source of a {@link FlowEdge} does not exist or
+   * if the {@link FlowEdge} is not in the graph, return false.
+   */
+  public synchronized boolean deleteFlowEdge(FlowEdge edge) {
+    if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) {
+      return false;
+    }
+    DataNode node = dataNodeMap.get(edge.getEndPoints().get(0));
+    if(!nodesToEdges.get(node).contains(edge)) {
+      return false;
+    }
+    this.nodesToEdges.get(node).remove(edge);
+    this.flowEdgeMap.remove(edge.getId());
+    return true;
+  }
+
+  /**
+   * Get the set of edges adjacent to a {@link DataNode}
+   * @param nodeId identifier of the node
+   * @return Set of {@link FlowEdge}s adjacent to the node.
+   */
+  @Override
+  public Set<FlowEdge> getEdges(String nodeId) {
+    DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null);
+    return getEdges(dataNode);
+  }
+
+  /**
+   * Get the set of edges adjacent to a {@link DataNode}
+   * @param node {@link DataNode}
+   * @return Set of {@link FlowEdge}s adjacent to the node.
+   */
+  @Override
+  public Set<FlowEdge> getEdges(DataNode node) {
+    return (node != null)? this.nodesToEdges.getOrDefault(node, null) : null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
new file mode 100644
index 0000000..8ae0027
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import lombok.Getter;
+
+
+/**
+ * An implementation of Dag. Assumes that nodes have unique values. Nodes with duplicate values will produce
+ * unpredictable behavior.
+ */
+@Alpha
+@Getter
+public class Dag<T> {
+  private List<DagNode<T>> startNodes;
+  private List<DagNode<T>> endNodes;
+  // Map to maintain parent to children mapping.
+  private Map<DagNode, List<DagNode<T>>> parentChildMap;
+  private List<DagNode<T>> nodes;
+
+  public Dag(List<DagNode<T>> dagNodes) {
+    this.nodes = dagNodes;
+    //Build dag
+    this.build();
+  }
+
+  /**
+   * Constructs the dag from the Node list.
+   */
+  private void build() {
+    this.startNodes = new ArrayList<>();
+    this.endNodes = new ArrayList<>();
+    this.parentChildMap = new HashMap<>();
+    for (DagNode node : this.nodes) {
+      //If a Node has no parent Node, add it to the list of start Nodes
+      if (node.getParentNodes() == null) {
+        this.startNodes.add(node);
+      } else {
+        List<DagNode> parentNodeList = node.getParentNodes();
+        for (DagNode parentNode : parentNodeList) {
+          if (parentChildMap.containsKey(parentNode)) {
+            parentChildMap.get(parentNode).add(node);
+          } else {
+            parentChildMap.put(parentNode, Lists.newArrayList(node));
+          }
+        }
+      }
+    }
+    //Iterate over all the Nodes and add a Node to the list of endNodes if it is not present in the parentChildMap
+    for (DagNode node : this.nodes) {
+      if (!parentChildMap.containsKey(node)) {
+        this.endNodes.add(node);
+      }
+    }
+  }
+
+  public List<DagNode<T>> getChildren(DagNode node) {
+    return parentChildMap.getOrDefault(node, null);
+  }
+
+  public List<DagNode<T>> getParents(DagNode node) {
+    return node.parentNodes;
+  }
+
+  /**
+   * Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag.
+   * The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes)
+   * are completed before starting any job of the "other" dag. This is done by adding each endNode of this dag as
+   * a parent of every startNode of the other dag.
+   *
+   * @param other dag to concatenate to this dag
+   * @return the concatenated dag
+   */
+  public Dag<T> concatenate(Dag<T> other) throws IOException {
+    if (other == null) {
+      return this;
+    }
+    for (DagNode node : this.endNodes) {
+      this.parentChildMap.put(node, Lists.newArrayList());
+      for (DagNode otherNode : other.startNodes) {
+        this.parentChildMap.get(node).add(otherNode);
+        otherNode.addParentNode(node);
+      }
+      this.endNodes = other.endNodes;
+    }
+    return this;
+  }
+
+  @Getter
+  public static class DagNode<T> {
+    private T value;
+    //List of parent Nodes that are dependencies of this Node.
+    private List<DagNode<T>> parentNodes;
+
+    //Constructor
+    public DagNode(T value) {
+      this.value = value;
+    }
+
+    public void addParentNode(DagNode<T> node) {
+      if (parentNodes == null) {
+        parentNodes = Lists.newArrayList(node);
+        return;
+      }
+      parentNodes.add(node);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      DagNode that = (DagNode) o;
+      if (!this.getValue().equals(that.getValue())) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.getValue().hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
new file mode 100644
index 0000000..4931685
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+
+
+/**
+ * Representation of a node in the FlowGraph. Each node is identified by a unique identifier.
+ */
+@Alpha
+public interface DataNode {
+  /**
+   * @return the identifier of a {@link DataNode}.
+   */
+  String getId();
+
+  /**
+   * @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}
+   * e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node.
+   */
+  Config getProps();
+
+  /**
+   * @return true if the {@link DataNode} is active
+   */
+  boolean isActive();
+
+  class DataNodeCreationException extends Exception {
+    private static final String MESSAGE_FORMAT = "Failed to create DataNode because of: %s";
+
+    public DataNodeCreationException(Exception e) {
+      super(String.format(MESSAGE_FORMAT, e.getMessage()), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
new file mode 100644
index 0000000..e98337d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+/**
+ * Config keys related to {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}.
+ */
+public class DatasetDescriptorConfigKeys {
+  //Gobblin Service Dataset Descriptor related keys
+  public static final String PATH_KEY = "path";
+  public static final String FORMAT_KEY = "format";
+  public static final String DESCRIPTION_KEY = "description";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
new file mode 100644
index 0000000..5899645
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
+
+/**
+ * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
+ * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
+ */
+@Alpha
+public abstract class FileSystemDataNode extends BaseDataNode {
+  public static final String FS_URI_KEY = "fs.uri";
+  @Getter
+  private String fsUri;
+
+  /**
+   * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
+   */
+  public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+    try {
+      this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode");
+      URI uri = new URI(this.fsUri);
+      if(!isUriValid(uri)) {
+        throw new IOException("Invalid FS URI " + this.fsUri);
+      }
+    } catch(Exception e) {
+      throw new DataNodeCreationException(e);
+    }
+  }
+
+  public abstract boolean isUriValid(URI fsUri);
+  /**
+   * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FileSystemDataNode that = (FileSystemDataNode) o;
+
+    return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri());
+  }
+
+  @Override
+  public int hashCode() {
+    return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
new file mode 100644
index 0000000..fb60d67
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.List;
+
+import com.typesafe.config.Config;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+/**
+ * Representation of an edge in a FlowGraph. Each {@link FlowEdge} encapsulates:
+ * <p><ul>
+ *   <li> two {@link DataNode}s as its end points
+ *   <li>a {@FlowTemplate} that responsible for data movement between the {@DataNode}s.
+ *   <li> a list of {@link SpecExecutor}s where the {@link FlowTemplate} can be executed.
+ * </ul></p> and
+ *
+ */
+@Alpha
+public interface FlowEdge {
+  /**
+   *
+   * @return the {@link DataNode} ids that are the end points of the edge.
+   */
+  List<String> getEndPoints();
+
+  /**
+   *
+   * @return the {@link FlowTemplate} that performs the data movement along the edge.
+   */
+  FlowTemplate getFlowTemplate();
+
+  /**
+   *
+   * @return a list of {@link SpecExecutor}s that can execute the {@link FlowTemplate} corresponding to this edge.
+   */
+  List<SpecExecutor> getExecutors();
+
+  /**
+   * Get the properties that defines the {@link FlowEdge}. Encapsulates all the properties from which the {@link FlowEdge}
+   * is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}.
+   * @return the properties of this edge as a {@link Config} object.
+   */
+  Config getProps();
+
+  /**
+   * A string uniquely identifying the edge.
+   * @return the label of the {@link FlowEdge}.
+   */
+  String getId();
+
+  /**
+   *
+   * @return true if the {@link FlowEdge} is active.
+   */
+  boolean isActive();
+
+  /**
+   *
+   * @param user
+   * @return true if the user has ACL permissions to access the {@link FlowEdge},
+   */
+  boolean isAccessible(UserGroupInformation user);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
new file mode 100644
index 0000000..851e887
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+import com.typesafe.config.Config;
+
+
+public interface FlowEdgeFactory {
+  /**
+   * Construct a {@link FlowEdge} from the edge properties
+   * @param edgeProps properties of the {@link FlowEdge}
+   * @param catalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
+   *               useful for creating a {@link FlowEdge}.
+   * @return an instance of {@link FlowEdge}
+   * @throws FlowEdgeCreationException
+   */
+  public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException;
+
+  /**
+   * Get an edge label from the edge properties
+   * @param edgeProps properties of the edge
+   * @return a string label identifying the edge
+   */
+  public String getEdgeId(Config edgeProps) throws IOException;
+
+  public class FlowEdgeCreationException extends Exception {
+    private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";
+
+    public FlowEdgeCreationException(Exception e) {
+      super(String.format(MESSAGE_FORMAT, e.getMessage()), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
new file mode 100644
index 0000000..23f5793
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.Collection;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * An interface for {@link FlowGraph}. A {@link FlowGraph} consists of {@link DataNode}s and {@link FlowEdge}s.
+ * The interface provides methods for adding and removing {@link DataNode}s and {@link FlowEdge}s to the {@link FlowGraph}.
+ * In addition the interface provides methods to return factory classes for creation of {@link DataNode}s and {@link FlowEdge}s.
+ */
+
+@Alpha
+public interface FlowGraph {
+
+  /**
+   * Add a {@link DataNode} to the {@link FlowGraph}
+   * @param node {@link DataNode} to be added
+   * @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.
+   */
+  public boolean addDataNode(DataNode node);
+
+  /**
+   * Add a {@link FlowEdge} to the {@link FlowGraph}
+   * @param edge {@link FlowEdge} to be added
+   * @return true if {@link FlowEdge} is added to the {@link FlowGraph} successfully.
+   */
+  public boolean addFlowEdge(FlowEdge edge);
+
+  /**
+   * Remove a {@link DataNode} and all its incident edges from the {@link FlowGraph}
+   * @param nodeId identifier of the {@link DataNode} to be removed
+   * @return true if {@link DataNode} is removed from the {@link FlowGraph} successfully.
+   */
+  public boolean deleteDataNode(String nodeId);
+
+  /**
+   * Remove a {@link FlowEdge} from the {@link FlowGraph}
+   * @param edgeId label of the edge to be removed
+   * @return true if edge is removed from the {@link FlowGraph} successfully.
+   */
+  public boolean deleteFlowEdge(String edgeId);
+
+  /**
+   * Get a collection of edges adjacent to a {@link DataNode}. Useful for path finding algorithms and graph
+   * traversal algorithms such as Djikstra's shortest-path algorithm, BFS
+   * @param nodeId identifier of the {@link DataNode}
+   * @return a collection of edges adjacent to the {@link DataNode}
+   */
+  public Collection<FlowEdge> getEdges(String nodeId);
+
+  /**
+   * Get a collection of edges adjacent to a {@link DataNode}.
+   * @param node {@link DataNode}
+   * @return a collection of edges adjacent to the {@link DataNode}
+   */
+  public Collection<FlowEdge> getEdges(DataNode node);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
new file mode 100644
index 0000000..0d94e3f
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+public class FlowGraphConfigurationKeys {
+  public static final String DATA_NODE_PREFIX = "data.node.";
+  public static final String FLOW_EDGE_PREFIX = "flow.edge.";
+
+  /**
+   *   {@link DataNode} configuration keys.
+   */
+  public static final String DATA_NODE_ID_KEY = DATA_NODE_PREFIX + "id";
+  public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
+
+  /**
+   * {@link FlowEdge} configuration keys.
+   */
+  public static final String FLOW_EDGE_SOURCE_KEY = FLOW_EDGE_PREFIX + "source";
+  public static final String FLOW_EDGE_DESTINATION_KEY = FLOW_EDGE_PREFIX + "destination";
+  public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
+  public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
+  public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
+  public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX +"specExecutors";
+  public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
new file mode 100644
index 0000000..7bcc18d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import joptsimple.internal.Strings;
+
+
+/**
+ * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class HdfsDataNode extends FileSystemDataNode {
+  public static final String HDFS_SCHEME = "hdfs";
+
+  public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+   *
+   * @param fsUri FileSystem URI
+   * @return true if the scheme is "hdfs" and authority is not empty.
+   */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    //Check that the scheme is "hdfs"
+    if(!scheme.equals(HDFS_SCHEME)) {
+      return false;
+    }
+    //Ensure that the authority is not empty
+    if(Strings.isNullOrEmpty(fsUri.getAuthority())) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
new file mode 100644
index 0000000..6dc1aa3
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import com.typesafe.config.Config;
+
+/**
+ * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class LocalFSDataNode extends FileSystemDataNode {
+  public static final String LOCAL_FS_SCHEME = "file";
+
+  public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+   *
+   * @param fsUri FileSystem URI
+   * @return true if the scheme of fsUri equals "file"
+   */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    if(scheme.equals(LOCAL_FS_SCHEME)) {
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
new file mode 100644
index 0000000..30d8309
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+
+/**
+ * An interface primarily for representing a flow of {@link JobTemplate}s. It also has
+ * method for retrieving required configs for every {@link JobTemplate} in the flow.
+ */
+@Alpha
+public interface FlowTemplate extends Spec {
+
+  /**
+   * @return the {@link Collection} of {@link JobTemplate}s that belong to this {@link FlowTemplate}.
+   */
+  List<JobTemplate> getJobTemplates();
+
+  /**
+   *
+   * @return the {@link Dag<JobTemplate>} that backs the {@link FlowTemplate}.
+   */
+  Dag<JobTemplate> getDag() throws IOException;
+
+  /**
+   *
+   * @return all configuration inside pre-written template.
+   */
+  Config getRawTemplateConfig();
+
+  /**
+   * @return list of input/output {@link DatasetDescriptor}s for the {@link FlowTemplate}.
+   */
+  List<Pair<DatasetDescriptor, DatasetDescriptor>> getInputOutputDatasetDescriptors();
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
new file mode 100644
index 0000000..3847fd2
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
+
+/**
+ * A {@link FlowTemplate} that loads a HOCON file as a {@link StaticFlowTemplate}.
+ */
+@Alpha
+public class HOCONInputStreamFlowTemplate extends StaticFlowTemplate {
+  public static final String VERSION_KEY = "gobblin.flow.template.version";
+  public static final String DEFAULT_VERSION = "1";
+
+  public HOCONInputStreamFlowTemplate(InputStream inputStream, URI uri, FlowCatalogWithTemplates catalog)
+      throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
+    this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)).resolve(
+        ConfigResolveOptions.defaults().setAllowUnresolved(true)), uri, catalog);
+  }
+
+  public HOCONInputStreamFlowTemplate(Config config, URI uri, FlowCatalogWithTemplates catalog)
+      throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
+    super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
+        config.hasPath(ConfigurationKeys.FLOW_DESCRIPTION_KEY) ? config
+            .getString(ConfigurationKeys.FLOW_DESCRIPTION_KEY) : "", config, catalog);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
new file mode 100644
index 0000000..b89da4d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A Factory class used for constructing a {@link Dag} of {@link org.apache.gobblin.runtime.api.JobTemplate}s from
+ * a {@link URI} of a {@link FlowTemplate}.
+ */
+@Alpha
+@Slf4j
+public class JobTemplateDagFactory {
+  public static final String JOB_TEMPLATE_FILE_SUFFIX = ".conf";
+
+  public static Dag<JobTemplate> createDagFromJobTemplates(List<JobTemplate> jobTemplates) {
+    Map<URI, Dag.DagNode<JobTemplate>> uriJobTemplateMap = new HashMap<>();
+    List<Dag.DagNode<JobTemplate>> dagNodeList = new ArrayList<>();
+    /**
+     * Create a {@link Dag.DagNode<JobTemplate>} for every {@link JobTemplate} in the flow. Add this node
+     * to a {@link Map<URI,JobTemplate>}.
+     */
+    for (JobTemplate template : jobTemplates) {
+      Dag.DagNode<JobTemplate> dagNode = new Dag.DagNode<>(template);
+      dagNodeList.add(dagNode);
+      uriJobTemplateMap.put(template.getUri(), dagNode);
+    }
+
+    /**
+     * Iterate over each {@link JobTemplate} to get the dependencies of each {@link JobTemplate}.
+     * For each {@link JobTemplate}, get the corresponding {@link Dag.DagNode} and
+     * set the {@link Dag.DagNode}s corresponding to the dependencies as its parent nodes.
+     *
+     * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
+     */
+    Path templateDirPath = new Path(jobTemplates.get(0).getUri()).getParent();
+    for (JobTemplate template : jobTemplates) {
+      URI templateUri = template.getUri();
+      Dag.DagNode<JobTemplate> node = uriJobTemplateMap.get(templateUri);
+      Collection<String> dependencies = template.getDependencies();
+      for (String dependency : dependencies) {
+        URI dependencyUri = new Path(templateDirPath, dependency).suffix(JOB_TEMPLATE_FILE_SUFFIX).toUri();
+        Dag.DagNode<JobTemplate> parentNode = uriJobTemplateMap.get(dependencyUri);
+        node.addParentNode(parentNode);
+      }
+    }
+    Dag<JobTemplate> dag = new Dag<>(dagNodeList);
+    return dag;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
new file mode 100644
index 0000000..8347022
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.Path;
+
+import lombok.Getter;
+
+
+/**
+ * A {@link FlowTemplate} using a static {@link Config} as the raw configuration for the template.
+ */
+@Alpha
+public class StaticFlowTemplate implements FlowTemplate {
+  public static final String INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.input";
+  public static final String OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.output";
+  public static final String DATASET_DESCRIPTOR_CLASS_KEY = "class";
+
+  @Getter
+  private URI uri;
+  @Getter
+  private String version;
+  @Getter
+  private String description;
+  @Getter
+  private FlowCatalogWithTemplates catalog;
+  @Getter
+  private List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors;
+  @Getter
+  private List<JobTemplate> jobTemplates;
+
+  private Dag<JobTemplate> dag;
+
+  private Config rawConfig;
+  private boolean isTemplateMaterialized;
+
+  public StaticFlowTemplate(URI uri, String version, String description, Config config,
+      FlowCatalogWithTemplates catalog)
+      throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
+    this.uri = uri;
+    this.version = version;
+    this.description = description;
+    this.inputOutputDatasetDescriptors = buildInputOutputDescriptors(config);
+    this.rawConfig = config;
+    this.catalog = catalog;
+    URI flowTemplateDir = new Path(this.uri).getParent().toUri();
+    this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDir);
+  }
+
+  //Constructor for testing purposes
+  public StaticFlowTemplate(URI uri, String version, String description, Config config,
+      FlowCatalogWithTemplates catalog, List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors, List<JobTemplate> jobTemplates)
+      throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
+    this.uri = uri;
+    this.version = version;
+    this.description = description;
+    this.inputOutputDatasetDescriptors = inputOutputDatasetDescriptors;
+    this.rawConfig = config;
+    this.catalog = catalog;
+    this.jobTemplates = jobTemplates;
+  }
+
+  /**
+   * Generate the input/output dataset descriptors for the {@link FlowTemplate}.
+   */
+  private List<Pair<DatasetDescriptor, DatasetDescriptor>> buildInputOutputDescriptors(Config config)
+      throws IOException, ReflectiveOperationException {
+    if (!config.hasPath(INPUT_DATASET_DESCRIPTOR_PREFIX) || !config.hasPath(OUTPUT_DATASET_DESCRIPTOR_PREFIX)) {
+      throw new IOException("Flow template must specify at least one input/output dataset descriptor");
+    }
+    int i = 0;
+    String inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+    List<Pair<DatasetDescriptor, DatasetDescriptor>> result = Lists.newArrayList();
+    while (config.hasPath(inputPrefix)) {
+      Config inputDescriptorConfig = config.getConfig(inputPrefix);
+      DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig);
+      String outputPrefix = Joiner.on(".").join(OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i++));
+      Config outputDescriptorConfig = config.getConfig(outputPrefix);
+      DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig);
+      result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
+      inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+    }
+    return result;
+  }
+
+  private DatasetDescriptor getDatasetDescriptor(Config descriptorConfig)
+      throws ReflectiveOperationException {
+    Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DATASET_DESCRIPTOR_CLASS_KEY));
+    return (DatasetDescriptor) GobblinConstructorUtils
+        .invokeLongestConstructor(datasetDescriptorClass, descriptorConfig);
+  }
+
+  @Override
+  public Config getRawTemplateConfig() {
+    return this.rawConfig;
+  }
+
+  private void ensureTemplateMaterialized()
+      throws IOException {
+    try {
+      if (!isTemplateMaterialized) {
+        this.dag = JobTemplateDagFactory.createDagFromJobTemplates(this.jobTemplates);
+      }
+      this.isTemplateMaterialized = true;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public List<JobTemplate> getJobTemplates() {
+    return this.jobTemplates;
+  }
+
+  @Override
+  public Dag<JobTemplate> getDag()
+      throws IOException {
+    ensureTemplateMaterialized();
+    return this.dag;
+  }
+}