You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/30 16:57:40 UTC

[4/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

[GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

Closes #2393 from sv2000/multiHopCompiler


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/22a951f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/22a951f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/22a951f0

Branch: refs/heads/master
Commit: 22a951f0a4ac0c963e99cd2a15989c62a08c81cf
Parents: 33d4fea
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jul 30 09:57:31 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jul 30 09:57:31 2018 -0700

----------------------------------------------------------------------
 .../template/HOCONInputStreamJobTemplate.java   |   2 +-
 .../modules/core/GitMonitoringService.java      |   2 +-
 .../dataset/BaseHdfsDatasetDescriptor.java      |  98 -----
 .../modules/dataset/DatasetDescriptor.java      |  38 +-
 .../modules/dataset/EncryptionConfig.java       |  90 ++++
 .../modules/dataset/FSDatasetDescriptor.java    | 138 ++++++
 .../service/modules/dataset/FormatConfig.java   | 102 +++++
 .../modules/dataset/HdfsDatasetDescriptor.java  |  40 --
 .../service/modules/flow/FlowEdgeContext.java   |  46 ++
 .../service/modules/flow/FlowGraphPath.java     |  90 ++++
 .../modules/flow/FlowGraphPathFinder.java       | 320 ++++++++++++++
 .../modules/flow/MultiHopFlowCompiler.java      | 157 +++++++
 .../service/modules/flowgraph/BaseDataNode.java |   8 +-
 .../service/modules/flowgraph/BaseFlowEdge.java |  20 +-
 .../modules/flowgraph/BaseFlowGraph.java        |   8 +-
 .../gobblin/service/modules/flowgraph/Dag.java  |  10 +-
 .../service/modules/flowgraph/DataNode.java     |   3 +-
 .../flowgraph/DatasetDescriptorConfigKeys.java  |  18 +
 .../modules/flowgraph/FileSystemDataNode.java   |  83 ----
 .../service/modules/flowgraph/FlowEdge.java     |  12 +-
 .../service/modules/flowgraph/FlowGraph.java    |   7 +
 .../flowgraph/FlowGraphConfigurationKeys.java   |   8 +-
 .../service/modules/flowgraph/HdfsDataNode.java |  59 ---
 .../modules/flowgraph/LocalFSDataNode.java      |  51 ---
 .../flowgraph/datanodes/fs/AdlsDataNode.java    |  52 +++
 .../datanodes/fs/FileSystemDataNode.java        |  87 ++++
 .../flowgraph/datanodes/fs/HdfsDataNode.java    |  58 +++
 .../flowgraph/datanodes/fs/LocalFSDataNode.java |  51 +++
 .../service/modules/spec/JobExecutionPlan.java  | 117 ++++++
 .../spec/JobExecutionPlanDagFactory.java        | 114 +++++
 .../service/modules/template/FlowTemplate.java  |  39 +-
 .../template/HOCONInputStreamFlowTemplate.java  |  13 +-
 .../modules/template/JobTemplateDagFactory.java |  79 ----
 .../modules/template/StaticFlowTemplate.java    | 143 ++++---
 .../modules/template_catalog/FSFlowCatalog.java |  90 ++--
 .../FlowCatalogWithTemplates.java               |   9 +-
 .../modules/core/GitFlowGraphMonitorTest.java   | 314 ++++++++++++++
 .../modules/flow/FlowGraphPathFinderTest.java   | 417 +++++++++++++++++++
 .../flowgraph/BaseFlowEdgeFactoryTest.java      |  74 ++++
 .../modules/flowgraph/BaseFlowGraphTest.java    |  13 +-
 .../spec/JobExecutionPlanDagFactoryTest.java    | 116 ++++++
 .../template/JobTemplateDagFactoryTest.java     |  92 ----
 .../template_catalog/FSFlowCatalogTest.java     |  53 ++-
 .../src/test/resources/flow/flow.conf           |  24 ++
 .../datanodes/AdlsDataNode-1.properties         |   3 +
 .../datanodes/HdfsDataNode-1.properties         |   3 +
 .../datanodes/HdfsDataNode-2.properties         |   3 +
 .../datanodes/HdfsDataNode-3.properties         |   3 +
 .../datanodes/HdfsDataNode-4.properties         |   3 +
 .../datanodes/LocalFsDataNode-1.properties      |   3 +
 .../hdfs-1-to-hdfs-1-encrypt.properties         |   9 +
 .../flowedges/hdfs-1-to-hdfs-3.properties       |  10 +
 .../hdfs-2-to-hdfs-2-encrypt.properties         |   9 +
 .../flowedges/hdfs-2-to-hdfs-4.properties       |   9 +
 .../flowedges/hdfs-3-to-adls-1.properties       |  13 +
 .../flowedges/hdfs-4-to-adls-1.properties       |  13 +
 .../flowedges/local-to-hdfs-1.properties        |   9 +
 .../flowedges/local-to-hdfs-2.properties        |   9 +
 .../modules/core/GitFlowGraphMonitorTest.java   | 314 --------------
 .../flowgraph/BaseFlowEdgeFactoryTest.java      |  73 ----
 .../template_catalog/flowEdgeTemplate/flow.conf |  20 +
 .../flowEdgeTemplate/jobs/job1.job              |   1 +
 .../flowEdgeTemplate/jobs/job2.job              |   3 +
 .../flowEdgeTemplate/jobs/job3.job              |   2 +
 .../flowEdgeTemplate/jobs/job4.job              |   2 +
 .../hdfsConvertToJsonAndEncrypt/flow.conf       |  18 +
 .../jobs/hdfs-encrypt-avro-to-json.job          |   1 +
 .../flowEdgeTemplates/hdfsToAdl/flow.conf       |  18 +
 .../hdfsToAdl/jobs/distcp-hdfs-to-adl.job       |   1 +
 .../flowEdgeTemplates/hdfsToHdfs/flow.conf      |  15 +
 .../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job     |   1 +
 .../flowEdgeTemplates/localToHdfs/flow.conf     |   9 +
 .../localToHdfs/jobs/distcp-local-to-hdfs.job   |   1 +
 .../distcp-push-hdfs-to-adl.template            |  65 +++
 .../multihop/jobTemplates/distcp.template       |  57 +++
 .../hdfs-convert-to-json-and-encrypt.template   |  42 ++
 .../template_catalog/templates/job1.template    |   2 +
 .../template_catalog/templates/job2.template    |   2 +
 .../template_catalog/templates/job3.template    |   2 +
 .../template_catalog/templates/job4.template    |   2 +
 .../template_catalog/test-template/flow.conf    |  30 +-
 .../test-template/jobs/job1.conf                |   2 -
 .../test-template/jobs/job1.job                 |   1 +
 .../test-template/jobs/job2.conf                |   3 -
 .../test-template/jobs/job2.job                 |   3 +
 .../test-template/jobs/job3.conf                |   3 -
 .../test-template/jobs/job3.job                 |   2 +
 .../test-template/jobs/job4.conf                |   3 -
 .../test-template/jobs/job4.job                 |   2 +
 89 files changed, 3082 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
index a1337fd..5e132fe 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
@@ -51,7 +51,7 @@ public class HOCONInputStreamJobTemplate extends StaticJobTemplate {
     this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)), uri, catalog);
   }
 
-  private HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
+  public HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
       throws SpecNotFoundException, TemplateException {
     super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
         config.hasPath(ConfigurationKeys.JOB_DESCRIPTION_KEY) ? config.getString(ConfigurationKeys.JOB_DESCRIPTION_KEY) : "",

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index 2361edc..c4d3656 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -107,7 +107,7 @@ public abstract class GitMonitoringService extends AbstractIdleService {
         ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
   }
 
-  synchronized void setActive(boolean isActive) {
+  public synchronized void setActive(boolean isActive) {
     if (this.isActive == isActive) {
       // No-op if already in correct state
       return;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
deleted file mode 100644
index 7d7e2b4..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
-
-import lombok.Getter;
-
-
-/**
- * An implementation of {@link HdfsDatasetDescriptor}.
- */
-@Alpha
-public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor {
-  @Getter
-  private final String path;
-  @Getter
-  private final String format;
-  @Getter
-  private final String description;
-  @Getter
-  private final String platform;
-
-  public BaseHdfsDatasetDescriptor(Config config) {
-    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY));
-    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY));
-
-    this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null);
-    this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null);
-    this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
-    this.platform = "hdfs";
-  }
-
-  /**
-   * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical
-   * platform, type, path, and format.
-   * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s
-   * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns.
-   * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the
-   * two descriptors should be compatible.
-   * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}.
-   */
-  @Override
-  public boolean isCompatibleWith(DatasetDescriptor o) {
-    return this.equals(o);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o;
-    if(this.getPlatform() == null || other.getPlatform() == null) {
-      return false;
-    }
-    if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) {
-      return false;
-    }
-
-    return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat());
-  }
-
-  @Override
-  public String toString() {
-     return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    return this.toString().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 4a322e6..e8474e3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -17,28 +17,54 @@
 
 package org.apache.gobblin.service.modules.dataset;
 
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.annotation.Alpha;
 
 
 /**
- * The interface for dataset descriptors.
+ * The interface for dataset descriptors. Each dataset is described in terms of the following attributes:
+ *  <ul>
+ *    <p> platform (e.g. HDFS, ADLS, JDBC). </p>
+ *    <p> path, which describes the fully qualified name of the dataset. </p>
+ *    <p> a format descriptor, which encapsulates its representation (e.g. avro, csv), codec (e.g. gzip, deflate), and
+ *    encryption config (e.g. aes_rotating, gpg). </p>
+ *  </ul>
  */
 @Alpha
 public interface DatasetDescriptor {
   /**
-   * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.)
+   * @return the dataset platform i.e. the storage system backing the dataset (e.g. HDFS, ADLS, JDBC etc.)
    */
   public String getPlatform();
 
   /**
+   * Returns the fully qualified name of a dataset. The fully qualified name is the absolute directory path of a dataset
+   * when the dataset is backed by a FileSystem. In the case of a database table, it is dbName.tableName.
+   * @return dataset path.
+   */
+  public String getPath();
+
+  /**
+   *
+   * @return storage format of the dataset.
+   */
+  public FormatConfig getFormatConfig();
+
+  /**
    * @return a human-readable description of the dataset.
    */
   public String getDescription();
 
   /**
-   * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the
-   * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}.
-   * This check is non-commutative.
+   * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
+   * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other
+   * {@link DatasetDescriptor}. This operation is non-commutative.
+   */
+  public boolean contains(DatasetDescriptor other);
+
+  /**
+   * @return the raw config.
    */
-  public boolean isCompatibleWith(DatasetDescriptor other);
+  public Config getRawConfig();
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
new file mode 100644
index 0000000..21c7c17
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class EncryptionConfig {
+  @Getter
+  private final String encryptionAlgorithm;
+  @Getter
+  private final String keystoreType;
+  @Getter
+  private final String keystoreEncoding;
+
+  public EncryptionConfig(Config encryptionConfig) {
+    this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.keystoreType = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+  }
+
+  public boolean contains(EncryptionConfig other) {
+    if (other == null) {
+      return false;
+    }
+
+    String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
+    String otherKeystoreType = other.getKeystoreType();
+    String otherKeystoreEncoding = other.getKeystoreEncoding();
+
+    return (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getEncryptionAlgorithm())
+        || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
+        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreType())
+        || this.keystoreType.equalsIgnoreCase(otherKeystoreType))
+        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreEncoding())
+        || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof EncryptionConfig)) {
+      return false;
+    }
+    EncryptionConfig other = (EncryptionConfig) o;
+    return this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm()) && this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
+        && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType());
+  }
+
+  @Override
+  public String toString() {
+    return "(" + Joiner.on(",").join(this.encryptionAlgorithm, this.keystoreType, this.keystoreEncoding) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
+    result = 31 * result + keystoreType.toLowerCase().hashCode();
+    result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
new file mode 100644
index 0000000..a5cb717
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+
+/**
+ * An implementation of {@link DatasetDescriptor} with FS-based storage.
+ */
+@Alpha
+public class FSDatasetDescriptor implements DatasetDescriptor {
+  @Getter
+  private final String platform;
+  @Getter
+  private final String path;
+  @Getter
+  private final FormatConfig formatConfig;
+  @Getter
+  private final String description;
+  @Getter
+  private final Config rawConfig;
+
+  public FSDatasetDescriptor(Config config) {
+    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY), "Dataset descriptor config must specify platform");
+    this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
+    this.path = PathUtils.getPathWithoutSchemeAndAuthority(new Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
+    this.formatConfig = new FormatConfig(config);
+    this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+    this.rawConfig = config;
+  }
+
+  /**
+   * A helper to determine if the path description of this {@link DatasetDescriptor} is a superset of paths
+   * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
+   * is a glob pattern, we return false.
+   *
+   * @param otherPath a glob pattern that describes a set of paths.
+   * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
+   */
+  public boolean isPathContaining(String otherPath) {
+    if (otherPath == null) {
+      return false;
+    }
+    if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
+      return true;
+    }
+    if (PathUtils.isGlob(new Path(otherPath))) {
+      return false;
+    }
+    GlobPattern globPattern = new GlobPattern(this.getPath());
+    return globPattern.matches(otherPath);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean contains(DatasetDescriptor o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof FSDatasetDescriptor)) {
+      return false;
+    }
+    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+
+    if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+      return false;
+    }
+
+    return getFormatConfig().contains(other.getFormatConfig()) && isPathContaining(other.getPath());
+  }
+
+  /**
+   *
+   * @param o
+   * @return true iff  "this" dataset descriptor is compatible with the "other" and the "other" dataset descriptor is
+   * compatible with this dataset descriptor.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof FSDatasetDescriptor)) {
+      return false;
+    }
+    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+    if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+      return false;
+    }
+    return this.getPath().equals(other.getPath()) && this.getFormatConfig().equals(other.getFormatConfig());
+  }
+
+  @Override
+  public String toString() {
+     return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), this.getFormatConfig().toString()) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31 * result + platform.toLowerCase().hashCode();
+    result = 31 * result + path.hashCode();
+    result = 31 * result + getFormatConfig().hashCode();
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
new file mode 100644
index 0000000..a36182c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A location-independent descriptor of a dataset, which describes a dataset in terms of its physical attributes.
+ * The physical attributes include:
+ *  <ul>
+ *    <p> Data format (e.g. Avro, CSV, JSON). </p>
+ *    <p> Data encoding type (e.g. Gzip, Bzip2, Base64, Deflate). </p>
+ *    <p> Encryption properties (e.g. aes_rotating, gpg). </p>
+ *  </ul>
+ */
+@Alpha
+public class FormatConfig {
+  @Getter
+  private final String format;
+  @Getter
+  private final String codecType;
+  @Getter
+  private final EncryptionConfig encryptionConfig;
+
+  public FormatConfig(Config config) {
+    this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.codecType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.CODEC_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
+        .empty()));
+  }
+
+  public boolean contains(FormatConfig other) {
+    return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
+        && containsEncryptionConfig(other.getEncryptionConfig());
+  }
+
+  private boolean containsFormat(String otherFormat) {
+    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
+        || (this.getFormat().equalsIgnoreCase(otherFormat));
+  }
+
+  private boolean containsCodec(String otherCodecType) {
+    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
+        || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+  }
+
+  private boolean containsEncryptionConfig(EncryptionConfig otherEncryptionConfig) {
+    return this.getEncryptionConfig().contains(otherEncryptionConfig);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof FormatConfig)) {
+      return false;
+    }
+    FormatConfig other = (FormatConfig) o;
+    return this.getFormat().equalsIgnoreCase(other.getFormat()) && this.getCodecType().equalsIgnoreCase(other.getCodecType())
+        && this.getEncryptionConfig().equals(other.getEncryptionConfig());
+  }
+
+  @Override
+  public String toString() {
+    return "(" + Joiner.on(",").join(this.getFormat(), this.getCodecType(), this.getEncryptionConfig().toString()) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31 * result + codecType.toLowerCase().hashCode();
+    result = 31 * result + format.toLowerCase().hashCode();
+    result = 31 * result + encryptionConfig.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
deleted file mode 100644
index 6f1970c..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import org.apache.gobblin.annotation.Alpha;
-
-
-/**
- * A descriptor interface for HDFS datasets
- */
-@Alpha
-public interface HdfsDatasetDescriptor extends DatasetDescriptor {
-  /**
-   *
-   * @return dataset path.
-   */
-  public String getPath();
-
-  /**
-   *
-   * @return storage format of the dataset.
-   */
-  public String getFormat();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
new file mode 100644
index 0000000..daff8ce
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+
+
+/**
+ * A helper class used to maintain additional context associated with each {@link FlowEdge} during path
+ * computation while the edge is explored for its eligibility. The additional context includes the input
+ * {@link DatasetDescriptor} of this edge which is compatible with the previous {@link FlowEdge}'s output
+ * {@link DatasetDescriptor} (where "previous" means the immediately preceding {@link FlowEdge} visited before
+ * the current {@link FlowEdge}), and the corresponding output dataset descriptor of the current {@link FlowEdge}.
+ */
+@AllArgsConstructor
+@EqualsAndHashCode(exclude = {"mergedConfig", "specExecutor"})
+@Getter
+public class FlowEdgeContext {
+  private FlowEdge edge;
+  private DatasetDescriptor inputDatasetDescriptor;
+  private DatasetDescriptor outputDatasetDescriptor;
+  private Config mergedConfig;
+  private SpecExecutor specExecutor;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
new file mode 100644
index 0000000..c642708
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+/**
+ * A class that returns a {@link Dag} of {@link JobExecutionPlan}s from a sequence of edges
+ * represented as a {@link List} of {@link FlowEdgeContext}s.
+ */
+public class FlowGraphPath {
+  private List<FlowEdgeContext> path;
+  private FlowSpec flowSpec;
+  private Long flowExecutionId;
+
+  public FlowGraphPath(List<FlowEdgeContext> path, FlowSpec flowSpec, Long flowExecutionId) {
+    this.path = path;
+    this.flowSpec = flowSpec;
+    this.flowExecutionId = flowExecutionId;
+  }
+
+  public Dag<JobExecutionPlan> asDag()
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>());
+    Iterator<FlowEdgeContext> pathIterator = path.iterator();
+    while (pathIterator.hasNext()) {
+      Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next());
+      flowDag = flowDag.concatenate(flowEdgeDag);
+    }
+    return flowDag;
+  }
+
+  /**
+   * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data
+   * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}.
+   * @param flowEdgeContext an instance of {@link FlowEdgeContext}.
+   * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the {@link FlowEdge}.
+   */
+  private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext)
+      throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
+    DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
+    DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+    Config mergedConfig = flowEdgeContext.getMergedConfig();
+    SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();
+
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    //Get resolved job configs from the flow template
+    List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);
+    //Iterate over each resolved job config and convert the config to a JobSpec.
+    for (Config resolvedJobConfig : resolvedJobConfigs) {
+      jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId));
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
new file mode 100644
index 0000000..2b4746c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Alpha
+@Slf4j
+public class FlowGraphPathFinder {
+  private static final String SOURCE_PREFIX = "source";
+  private static final String DESTINATION_PREFIX = "destination";
+
+  private FlowGraph flowGraph;
+  private FlowSpec flowSpec;
+  private Config flowConfig;
+
+  private DataNode srcNode;
+  private DataNode destNode;
+
+  private DatasetDescriptor srcDatasetDescriptor;
+  private DatasetDescriptor destDatasetDescriptor;
+
+  //Maintain path of FlowEdges as parent-child map
+  private Map<FlowEdgeContext, FlowEdgeContext> pathMap;
+
+  //Flow Execution Id
+  private Long flowExecutionId;
+
+  /**
+   * Constructor.
+   * @param flowGraph
+   */
+  public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) {
+    this.flowGraph = flowGraph;
+    this.flowSpec = flowSpec;
+    this.flowConfig = flowSpec.getConfig();
+
+    //Get src/dest DataNodes from the flow config
+    String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+    String destNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+    this.srcNode = this.flowGraph.getNode(srcNodeId);
+    Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
+    this.destNode = this.flowGraph.getNode(destNodeId);
+    Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
+
+    //Get src/dest dataset descriptors from the flow config
+    Config srcDatasetDescriptorConfig =
+        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    Config destDatasetDescriptorConfig =
+        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
+    try {
+      Class srcdatasetDescriptorClass =
+          Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+      this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+          .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
+      Class destDatasetDescriptorClass =
+          Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+      this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+          .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
+   * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
+   * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
+   * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
+   */
+  public FlowGraphPath findPath() throws PathFinderException {
+    try {
+      //Initialization of auxiliary data structures used for path computation
+      this.pathMap = new HashMap<>();
+
+      // Generate flow execution id for this compilation
+      this.flowExecutionId = System.currentTimeMillis();
+
+      //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+      // flow graph.
+      // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will
+      // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers.
+      synchronized (this.flowGraph) {
+        //Base condition 1: Source Node or Dest Node is inactive; return null
+        if (!srcNode.isActive() || !destNode.isActive()) {
+          log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(),
+              this.destNode.getId());
+          return null;
+        }
+
+        //Base condition 2: Check if we are already at the target. If so, return an empty path.
+        if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+          return new FlowGraphPath(new ArrayList<>(), flowSpec, flowExecutionId);
+        }
+
+        LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+        edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+        for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+          this.pathMap.put(flowEdgeContext, flowEdgeContext);
+        }
+
+        //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
+        // to the edge E. For each adjacent edge E', do the following:
+        //    1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
+        //    2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
+        //       edge E'. If yes, add the edge E' to the edge queue.
+        // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
+        while (!edgeQueue.isEmpty()) {
+          FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+
+          DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+          DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+
+          //Are we done?
+          if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+            return constructPath(flowEdgeContext);
+          }
+
+          //Expand the currentNode to its adjacent edges and add them to the queue.
+          List<FlowEdgeContext> nextEdges =
+              getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
+          for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+            //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+            // queue.
+            if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+              edgeQueue.add(childFlowEdgeContext);
+              this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+            }
+          }
+        }
+      }
+      //No path found. Return null.
+      return null;
+    } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
+      throw new PathFinderException(
+          "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + this.destNode.getId(), e);
+    }
+  }
+
+  private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
+      DatasetDescriptor destDatasetDescriptor) {
+    if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
+   * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
+   * @param dataNode
+   * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
+   * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
+   * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
+   */
+  private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
+      DatasetDescriptor destDatasetDescriptor) {
+    List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+    for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+      try {
+        DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
+        //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
+        if (!edgeDestination.isActive() || !flowEdge.isActive()) {
+          continue;
+        }
+
+        boolean foundExecutor = false;
+        //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
+        for (SpecExecutor specExecutor: flowEdge.getExecutors()) {
+          Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+          List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
+              flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+          for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
+            DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
+            DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
+            if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+              FlowEdgeContext flowEdgeContext;
+              if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+                //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
+                // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
+                // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
+                flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, specExecutor);
+              } else {
+                //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
+                flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, specExecutor);
+              }
+              if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+                //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
+                // with those of destination dataset descriptor.
+                // In other words, we prioritize edges that perform data transformations as close to the source as possible.
+                prioritizedEdgeList.add(0, flowEdgeContext);
+              } else {
+                prioritizedEdgeList.add(flowEdgeContext);
+              }
+              foundExecutor = true;
+            }
+          }
+          // Found a SpecExecutor. Proceed to the next FlowEdge.
+          // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
+          if (foundExecutor) {
+            break;
+          }
+        }
+      } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
+          | JobTemplate.TemplateException e) {
+        //Skip the edge; and continue
+        log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
+      }
+    }
+    return prioritizedEdgeList;
+  }
+
+  /**
+   * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
+   * <ul>
+   *   <p> the user provided flow config </p>
+   *   <p> edge specific properties/overrides </p>
+   *   <p> spec executor config/overrides </p>
+   *   <p> source node config </p>
+   *   <p> destination node config </p>
+   * </ul>
+   * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
+   * @param flowEdge An instance of {@link FlowEdge}.
+   * @param specExecutor A {@link SpecExecutor}.
+   * @return the merged config derived as described above.
+   */
+  private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+      throws ExecutionException, InterruptedException {
+    Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
+    Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
+    Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
+        .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+    return mergedConfig;
+  }
+
+  /**
+   *
+   * @param flowEdgeContext of the last {@link FlowEdge} in the path.
+   * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
+   * @throws IOException
+   * @throws SpecNotFoundException
+   * @throws JobTemplate.TemplateException
+   * @throws URISyntaxException
+   */
+  private FlowGraphPath constructPath(FlowEdgeContext flowEdgeContext)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
+    List<FlowEdgeContext> path = new LinkedList<>();
+    path.add(flowEdgeContext);
+    FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
+    while (true) {
+      path.add(0, this.pathMap.get(currentFlowEdgeContext));
+      currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
+      //Are we at the first edge in the path?
+      if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
+        break;
+      }
+    }
+    FlowGraphPath flowGraphPath = new FlowGraphPath(path, flowSpec, flowExecutionId);
+    return flowGraphPath;
+  }
+
+  public static class PathFinderException extends Exception {
+    public PathFinderException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public PathFinderException(String message) {
+      super(message);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
new file mode 100644
index 0000000..8b14b10
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+
+/***
+ * Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec}
+ * and its mapping to {@link SpecExecutor}.
+ */
+@Alpha
+@Slf4j
+public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
+  @Getter
+  private FlowGraph flowGraph;
+  private GitFlowGraphMonitor gitFlowGraphMonitor;
+  @Getter
+  private boolean active;
+
+  public MultiHopFlowCompiler(Config config) {
+    this(config, true);
+  }
+
+  public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
+    this(config, Optional.<Logger>absent(), instrumentationEnabled);
+  }
+
+  public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
+    this(config, log, true);
+  }
+
+  public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
+    super(config, log, instrumentationEnabled);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog flowCatalog;
+    try {
+      flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+    }
+    this.flowGraph = new BaseFlowGraph();
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
+  }
+
+  public void setActive(boolean active) {
+    this.active = active;
+    this.gitFlowGraphMonitor.setActive(active);
+  }
+
+  /**
+   * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture
+   * job dependencies.
+   * @param spec
+   * @return
+   */
+  @Override
+  public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+    Preconditions.checkNotNull(spec);
+    Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs");
+
+    long startTime = System.nanoTime();
+    Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
+
+    FlowSpec flowSpec = (FlowSpec) spec;
+    String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
+    String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+    log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
+
+    FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
+    try {
+      //Compute the path from source to destination.
+      FlowGraphPath flowGraphPath = pathFinder.findPath();
+
+      //Convert the path into a Dag of JobExecutionPlans.
+      Dag<JobExecutionPlan> jobExecutionPlanDag;
+      if (flowGraphPath != null) {
+        jobExecutionPlanDag = flowGraphPath.asDag();
+      } else {
+        Instrumented.markMeter(this.flowCompilationFailedMeter);
+        log.info(String.format("No path found from source: %s and destination: %s", source, destination));
+        return null;
+      }
+
+      //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead
+      // of a Map. For now just add all specs into the map.
+      for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = node.getValue();
+        specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor());
+      }
+    } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException
+        | URISyntaxException e) {
+      Instrumented.markMeter(this.flowCompilationFailedMeter);
+      log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
+      return null;
+    }
+    Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
+    Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+    return specExecutorMap;
+  }
+
+  @Override
+  protected void populateEdgeTemplateMap() {
+    log.warn("No population of templates based on edge happen in this implementation");
+    return;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index 731bc22..4fb9711 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -19,9 +19,9 @@ package org.apache.gobblin.service.modules.flowgraph;
 
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.ConfigUtils;
 
 import joptsimple.internal.Strings;
@@ -38,7 +38,7 @@ public class BaseDataNode implements DataNode {
   @Getter
   private String id;
   @Getter
-  private Config props;
+  private Config rawConfig;
   @Getter
   private boolean active = true;
 
@@ -50,8 +50,8 @@ public class BaseDataNode implements DataNode {
       if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) {
         this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
       }
-      this.props = nodeProps;
-    } catch(Exception e) {
+      this.rawConfig = nodeProps;
+    } catch (Exception e) {
       throw new DataNodeCreationException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index fc82cc1..56f6c1b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -44,7 +44,10 @@ import org.apache.gobblin.util.ConfigUtils;
 @Alpha
 public class BaseFlowEdge implements FlowEdge {
   @Getter
-  protected List<String> endPoints;
+  protected String src;
+
+  @Getter
+  protected String dest;
 
   @Getter
   protected FlowTemplate flowTemplate;
@@ -53,7 +56,7 @@ public class BaseFlowEdge implements FlowEdge {
   private List<SpecExecutor> executors;
 
   @Getter
-  private Config props;
+  private Config config;
 
   @Getter
   private String id;
@@ -63,11 +66,12 @@ public class BaseFlowEdge implements FlowEdge {
 
   //Constructor
   public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
-    this.endPoints = endPoints;
+    this.src = endPoints.get(0);
+    this.dest = endPoints.get(1);
     this.flowTemplate = flowTemplate;
     this.executors = executors;
     this.active = active;
-    this.props = properties;
+    this.config = properties;
     this.id = edgeId;
   }
 
@@ -91,7 +95,7 @@ public class BaseFlowEdge implements FlowEdge {
 
     FlowEdge that = (FlowEdge) o;
 
-    if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+    if (!(this.getSrc().equals(that.getSrc())) && ((this.getDest()).equals(that.getDest()))) {
       return false;
     }
 
@@ -140,14 +144,14 @@ public class BaseFlowEdge implements FlowEdge {
           specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
         }
 
-        String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "");
+        String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "");
 
         //Perform basic validation
         Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
         Preconditions
             .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
         Preconditions
-            .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty");
+            .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
         boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
 
         //Build SpecExecutor from config
@@ -158,7 +162,7 @@ public class BaseFlowEdge implements FlowEdge {
           SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
           specExecutors.add(executor);
         }
-        FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
+        FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
         return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
       } catch (RuntimeException e) {
         throw e;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 783f7ea..edf40cc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -75,8 +75,8 @@ public class BaseFlowGraph implements FlowGraph {
    */
   @Override
   public synchronized boolean addFlowEdge(FlowEdge edge) {
-    String srcNode = edge.getEndPoints().get(0);
-    String dstNode = edge.getEndPoints().get(1);
+    String srcNode = edge.getSrc();
+    String dstNode = edge.getDest();
     if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
       return false;
     }
@@ -153,10 +153,10 @@ public class BaseFlowGraph implements FlowGraph {
    * if the {@link FlowEdge} is not in the graph, return false.
    */
   public synchronized boolean deleteFlowEdge(FlowEdge edge) {
-    if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) {
+    if(!dataNodeMap.containsKey(edge.getSrc())) {
       return false;
     }
-    DataNode node = dataNodeMap.get(edge.getEndPoints().get(0));
+    DataNode node = dataNodeMap.get(edge.getSrc());
     if(!nodesToEdges.get(node).contains(edge)) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 8ae0027..58bbb81 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -87,6 +87,10 @@ public class Dag<T> {
     return node.parentNodes;
   }
 
+  public boolean isEmpty() {
+    return this.nodes.isEmpty();
+  }
+
   /**
    * Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag.
    * The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes)
@@ -97,9 +101,12 @@ public class Dag<T> {
    * @return the concatenated dag
    */
   public Dag<T> concatenate(Dag<T> other) throws IOException {
-    if (other == null) {
+    if (other == null || other.isEmpty()) {
       return this;
     }
+    if (this.isEmpty()) {
+      return other;
+    }
     for (DagNode node : this.endNodes) {
       this.parentChildMap.put(node, Lists.newArrayList());
       for (DagNode otherNode : other.startNodes) {
@@ -108,6 +115,7 @@ public class Dag<T> {
       }
       this.endNodes = other.endNodes;
     }
+    this.nodes.addAll(other.nodes);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
index 4931685..b7a5274 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flowgraph;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
 
 
 /**
@@ -37,7 +36,7 @@ public interface DataNode {
    * @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}
    * e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node.
    */
-  Config getProps();
+  Config getRawConfig();
 
   /**
    * @return true if the {@link DataNode} is active

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index e98337d..23e20c8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -22,7 +22,25 @@ package org.apache.gobblin.service.modules.flowgraph;
  */
 public class DatasetDescriptorConfigKeys {
   //Gobblin Service Dataset Descriptor related keys
+  public static final String FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.input.dataset.descriptor";
+  public static final String FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.output.dataset.descriptor";
+
+  public static final String FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.input.dataset.descriptor";
+  public static final String FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.output.dataset.descriptor";
+
+  public static final String CLASS_KEY = "class";
+  public static final String PLATFORM_KEY = "platform";
   public static final String PATH_KEY = "path";
   public static final String FORMAT_KEY = "format";
+  public static final String CODEC_KEY = "codec";
   public static final String DESCRIPTION_KEY = "description";
+
+  //Dataset encryption related keys
+  public static final String ENCYPTION_PREFIX = "encrypt";
+  public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
+  public static final String ENCRYPTION_KEYSTORE_TYPE_KEY = "keystore_type";
+  public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = "keystore_encoding";
+
+  public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
deleted file mode 100644
index 5899645..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.util.ConfigUtils;
-
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
-
-/**
- * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
- * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
- */
-@Alpha
-public abstract class FileSystemDataNode extends BaseDataNode {
-  public static final String FS_URI_KEY = "fs.uri";
-  @Getter
-  private String fsUri;
-
-  /**
-   * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
-   */
-  public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
-    super(nodeProps);
-    try {
-      this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode");
-      URI uri = new URI(this.fsUri);
-      if(!isUriValid(uri)) {
-        throw new IOException("Invalid FS URI " + this.fsUri);
-      }
-    } catch(Exception e) {
-      throw new DataNodeCreationException(e);
-    }
-  }
-
-  public abstract boolean isUriValid(URI fsUri);
-  /**
-   * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    FileSystemDataNode that = (FileSystemDataNode) o;
-
-    return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri());
-  }
-
-  @Override
-  public int hashCode() {
-    return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
index fb60d67..497bd5b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
@@ -41,9 +41,15 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
 public interface FlowEdge {
   /**
    *
-   * @return the {@link DataNode} ids that are the end points of the edge.
+   * @return the source {@link DataNode} id of the edge.
    */
-  List<String> getEndPoints();
+  String getSrc();
+
+  /**
+   *
+   * @return the destination {@link DataNode} id of the edge.
+   */
+  String getDest();
 
   /**
    *
@@ -62,7 +68,7 @@ public interface FlowEdge {
    * is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}.
    * @return the properties of this edge as a {@link Config} object.
    */
-  Config getProps();
+  Config getConfig();
 
   /**
    * A string uniquely identifying the edge.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index 23f5793..b4aa7bf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -32,6 +32,13 @@ import org.apache.gobblin.annotation.Alpha;
 public interface FlowGraph {
 
   /**
+   * Get a {@link DataNode} from the node identifier
+   * @param nodeId {@link DataNode} identifier.
+   * @return the {@link DataNode} object if the node is present in the {@link FlowGraph}.
+   */
+  public DataNode getNode(String nodeId);
+
+  /**
    * Add a {@link DataNode} to the {@link FlowGraph}
    * @param node {@link DataNode} to be added
    * @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index cd4876a..8a49ec0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -22,7 +22,7 @@ public class FlowGraphConfigurationKeys {
   public static final String FLOW_EDGE_PREFIX = "flow.edge.";
 
   /**
-   *   {@link DataNode} configuration keys.
+   *   {@link DataNode} related configuration keys.
    */
   public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class";
   public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode";
@@ -30,7 +30,7 @@ public class FlowGraphConfigurationKeys {
   public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
 
   /**
-   * {@link FlowEdge} configuration keys.
+   * {@link FlowEdge} related configuration keys.
    */
   public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class";
   public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory";
@@ -39,7 +39,7 @@ public class FlowGraphConfigurationKeys {
   public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id";
   public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
   public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
-  public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
+  public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri";
   public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
-  public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
+  public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
deleted file mode 100644
index 7bcc18d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import joptsimple.internal.Strings;
-
-
-/**
- * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class HdfsDataNode extends FileSystemDataNode {
-  public static final String HDFS_SCHEME = "hdfs";
-
-  public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
-    super(nodeProps);
-  }
-
-  /**
-   *
-   * @param fsUri FileSystem URI
-   * @return true if the scheme is "hdfs" and authority is not empty.
-   */
-  @Override
-  public boolean isUriValid(URI fsUri) {
-    String scheme = fsUri.getScheme();
-    //Check that the scheme is "hdfs"
-    if(!scheme.equals(HDFS_SCHEME)) {
-      return false;
-    }
-    //Ensure that the authority is not empty
-    if(Strings.isNullOrEmpty(fsUri.getAuthority())) {
-      return false;
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
deleted file mode 100644
index 6dc1aa3..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import com.typesafe.config.Config;
-
-/**
- * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class LocalFSDataNode extends FileSystemDataNode {
-  public static final String LOCAL_FS_SCHEME = "file";
-
-  public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
-    super(nodeProps);
-  }
-
-  /**
-   *
-   * @param fsUri FileSystem URI
-   * @return true if the scheme of fsUri equals "file"
-   */
-  @Override
-  public boolean isUriValid(URI fsUri) {
-    String scheme = fsUri.getScheme();
-    if(scheme.equals(LOCAL_FS_SCHEME)) {
-      return true;
-    }
-    return false;
-  }
-}