You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/30 16:57:40 UTC
[4/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow
Compiler for Gobblin-as-a-Service (GaaS).
[GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).
Closes #2393 from sv2000/multiHopCompiler
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/22a951f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/22a951f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/22a951f0
Branch: refs/heads/master
Commit: 22a951f0a4ac0c963e99cd2a15989c62a08c81cf
Parents: 33d4fea
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jul 30 09:57:31 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jul 30 09:57:31 2018 -0700
----------------------------------------------------------------------
.../template/HOCONInputStreamJobTemplate.java | 2 +-
.../modules/core/GitMonitoringService.java | 2 +-
.../dataset/BaseHdfsDatasetDescriptor.java | 98 -----
.../modules/dataset/DatasetDescriptor.java | 38 +-
.../modules/dataset/EncryptionConfig.java | 90 ++++
.../modules/dataset/FSDatasetDescriptor.java | 138 ++++++
.../service/modules/dataset/FormatConfig.java | 102 +++++
.../modules/dataset/HdfsDatasetDescriptor.java | 40 --
.../service/modules/flow/FlowEdgeContext.java | 46 ++
.../service/modules/flow/FlowGraphPath.java | 90 ++++
.../modules/flow/FlowGraphPathFinder.java | 320 ++++++++++++++
.../modules/flow/MultiHopFlowCompiler.java | 157 +++++++
.../service/modules/flowgraph/BaseDataNode.java | 8 +-
.../service/modules/flowgraph/BaseFlowEdge.java | 20 +-
.../modules/flowgraph/BaseFlowGraph.java | 8 +-
.../gobblin/service/modules/flowgraph/Dag.java | 10 +-
.../service/modules/flowgraph/DataNode.java | 3 +-
.../flowgraph/DatasetDescriptorConfigKeys.java | 18 +
.../modules/flowgraph/FileSystemDataNode.java | 83 ----
.../service/modules/flowgraph/FlowEdge.java | 12 +-
.../service/modules/flowgraph/FlowGraph.java | 7 +
.../flowgraph/FlowGraphConfigurationKeys.java | 8 +-
.../service/modules/flowgraph/HdfsDataNode.java | 59 ---
.../modules/flowgraph/LocalFSDataNode.java | 51 ---
.../flowgraph/datanodes/fs/AdlsDataNode.java | 52 +++
.../datanodes/fs/FileSystemDataNode.java | 87 ++++
.../flowgraph/datanodes/fs/HdfsDataNode.java | 58 +++
.../flowgraph/datanodes/fs/LocalFSDataNode.java | 51 +++
.../service/modules/spec/JobExecutionPlan.java | 117 ++++++
.../spec/JobExecutionPlanDagFactory.java | 114 +++++
.../service/modules/template/FlowTemplate.java | 39 +-
.../template/HOCONInputStreamFlowTemplate.java | 13 +-
.../modules/template/JobTemplateDagFactory.java | 79 ----
.../modules/template/StaticFlowTemplate.java | 143 ++++---
.../modules/template_catalog/FSFlowCatalog.java | 90 ++--
.../FlowCatalogWithTemplates.java | 9 +-
.../modules/core/GitFlowGraphMonitorTest.java | 314 ++++++++++++++
.../modules/flow/FlowGraphPathFinderTest.java | 417 +++++++++++++++++++
.../flowgraph/BaseFlowEdgeFactoryTest.java | 74 ++++
.../modules/flowgraph/BaseFlowGraphTest.java | 13 +-
.../spec/JobExecutionPlanDagFactoryTest.java | 116 ++++++
.../template/JobTemplateDagFactoryTest.java | 92 ----
.../template_catalog/FSFlowCatalogTest.java | 53 ++-
.../src/test/resources/flow/flow.conf | 24 ++
.../datanodes/AdlsDataNode-1.properties | 3 +
.../datanodes/HdfsDataNode-1.properties | 3 +
.../datanodes/HdfsDataNode-2.properties | 3 +
.../datanodes/HdfsDataNode-3.properties | 3 +
.../datanodes/HdfsDataNode-4.properties | 3 +
.../datanodes/LocalFsDataNode-1.properties | 3 +
.../hdfs-1-to-hdfs-1-encrypt.properties | 9 +
.../flowedges/hdfs-1-to-hdfs-3.properties | 10 +
.../hdfs-2-to-hdfs-2-encrypt.properties | 9 +
.../flowedges/hdfs-2-to-hdfs-4.properties | 9 +
.../flowedges/hdfs-3-to-adls-1.properties | 13 +
.../flowedges/hdfs-4-to-adls-1.properties | 13 +
.../flowedges/local-to-hdfs-1.properties | 9 +
.../flowedges/local-to-hdfs-2.properties | 9 +
.../modules/core/GitFlowGraphMonitorTest.java | 314 --------------
.../flowgraph/BaseFlowEdgeFactoryTest.java | 73 ----
.../template_catalog/flowEdgeTemplate/flow.conf | 20 +
.../flowEdgeTemplate/jobs/job1.job | 1 +
.../flowEdgeTemplate/jobs/job2.job | 3 +
.../flowEdgeTemplate/jobs/job3.job | 2 +
.../flowEdgeTemplate/jobs/job4.job | 2 +
.../hdfsConvertToJsonAndEncrypt/flow.conf | 18 +
.../jobs/hdfs-encrypt-avro-to-json.job | 1 +
.../flowEdgeTemplates/hdfsToAdl/flow.conf | 18 +
.../hdfsToAdl/jobs/distcp-hdfs-to-adl.job | 1 +
.../flowEdgeTemplates/hdfsToHdfs/flow.conf | 15 +
.../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job | 1 +
.../flowEdgeTemplates/localToHdfs/flow.conf | 9 +
.../localToHdfs/jobs/distcp-local-to-hdfs.job | 1 +
.../distcp-push-hdfs-to-adl.template | 65 +++
.../multihop/jobTemplates/distcp.template | 57 +++
.../hdfs-convert-to-json-and-encrypt.template | 42 ++
.../template_catalog/templates/job1.template | 2 +
.../template_catalog/templates/job2.template | 2 +
.../template_catalog/templates/job3.template | 2 +
.../template_catalog/templates/job4.template | 2 +
.../template_catalog/test-template/flow.conf | 30 +-
.../test-template/jobs/job1.conf | 2 -
.../test-template/jobs/job1.job | 1 +
.../test-template/jobs/job2.conf | 3 -
.../test-template/jobs/job2.job | 3 +
.../test-template/jobs/job3.conf | 3 -
.../test-template/jobs/job3.job | 2 +
.../test-template/jobs/job4.conf | 3 -
.../test-template/jobs/job4.job | 2 +
89 files changed, 3082 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
index a1337fd..5e132fe 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
@@ -51,7 +51,7 @@ public class HOCONInputStreamJobTemplate extends StaticJobTemplate {
this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)), uri, catalog);
}
- private HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
+ public HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
throws SpecNotFoundException, TemplateException {
super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
config.hasPath(ConfigurationKeys.JOB_DESCRIPTION_KEY) ? config.getString(ConfigurationKeys.JOB_DESCRIPTION_KEY) : "",
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index 2361edc..c4d3656 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -107,7 +107,7 @@ public abstract class GitMonitoringService extends AbstractIdleService {
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
}
- synchronized void setActive(boolean isActive) {
+ public synchronized void setActive(boolean isActive) {
if (this.isActive == isActive) {
// No-op if already in correct state
return;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
deleted file mode 100644
index 7d7e2b4..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
-
-import lombok.Getter;
-
-
-/**
- * An implementation of {@link HdfsDatasetDescriptor}.
- */
-@Alpha
-public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor {
- @Getter
- private final String path;
- @Getter
- private final String format;
- @Getter
- private final String description;
- @Getter
- private final String platform;
-
- public BaseHdfsDatasetDescriptor(Config config) {
- Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY));
- Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY));
-
- this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null);
- this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null);
- this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
- this.platform = "hdfs";
- }
-
- /**
- * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical
- * platform, type, path, and format.
- * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s
- * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns.
- * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the
- * two descriptors should be compatible.
- * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}.
- */
- @Override
- public boolean isCompatibleWith(DatasetDescriptor o) {
- return this.equals(o);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o;
- if(this.getPlatform() == null || other.getPlatform() == null) {
- return false;
- }
- if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) {
- return false;
- }
-
- return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat());
- }
-
- @Override
- public String toString() {
- return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")";
- }
-
- @Override
- public int hashCode() {
- return this.toString().hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 4a322e6..e8474e3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -17,28 +17,54 @@
package org.apache.gobblin.service.modules.dataset;
+import com.typesafe.config.Config;
+
import org.apache.gobblin.annotation.Alpha;
/**
- * The interface for dataset descriptors.
+ * The interface for dataset descriptors. Each dataset is described in terms of the following attributes:
+ * <ul>
+ * <p> platform (e.g. HDFS, ADLS, JDBC). </p>
+ * <p> path, which describes the fully qualified name of the dataset. </p>
+ * <p> a format descriptor, which encapsulates its representation (e.g. avro, csv), codec (e.g. gzip, deflate), and
+ * encryption config (e.g. aes_rotating, gpg). </p>
+ * </ul>
*/
@Alpha
public interface DatasetDescriptor {
/**
- * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.)
+ * @return the dataset platform i.e. the storage system backing the dataset (e.g. HDFS, ADLS, JDBC etc.)
*/
public String getPlatform();
/**
+ * Returns the fully qualified name of a dataset. The fully qualified name is the absolute directory path of a dataset
+ * when the dataset is backed by a FileSystem. In the case of a database table, it is dbName.tableName.
+ * @return dataset path.
+ */
+ public String getPath();
+
+ /**
+ *
+ * @return storage format of the dataset.
+ */
+ public FormatConfig getFormatConfig();
+
+ /**
* @return a human-readable description of the dataset.
*/
public String getDescription();
/**
- * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the
- * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}.
- * This check is non-commutative.
+ * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
+ * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other
+ * {@link DatasetDescriptor}. This operation is non-commutative.
+ */
+ public boolean contains(DatasetDescriptor other);
+
+ /**
+ * @return the raw config.
*/
- public boolean isCompatibleWith(DatasetDescriptor other);
+ public Config getRawConfig();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
new file mode 100644
index 0000000..21c7c17
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class EncryptionConfig {
+ @Getter
+ private final String encryptionAlgorithm;
+ @Getter
+ private final String keystoreType;
+ @Getter
+ private final String keystoreEncoding;
+
+ public EncryptionConfig(Config encryptionConfig) {
+ this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.keystoreType = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ }
+
+ public boolean contains(EncryptionConfig other) {
+ if (other == null) {
+ return false;
+ }
+
+ String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
+ String otherKeystoreType = other.getKeystoreType();
+ String otherKeystoreEncoding = other.getKeystoreEncoding();
+
+ return (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getEncryptionAlgorithm())
+ || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
+ && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreType())
+ || this.keystoreType.equalsIgnoreCase(otherKeystoreType))
+ && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreEncoding())
+ || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof EncryptionConfig)) {
+ return false;
+ }
+ EncryptionConfig other = (EncryptionConfig) o;
+ return this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm()) && this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
+ && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.encryptionAlgorithm, this.keystoreType, this.keystoreEncoding) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
+ result = 31 * result + keystoreType.toLowerCase().hashCode();
+ result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
new file mode 100644
index 0000000..a5cb717
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+
+/**
+ * An implementation of {@link DatasetDescriptor} with FS-based storage.
+ */
+@Alpha
+public class FSDatasetDescriptor implements DatasetDescriptor {
+ @Getter
+ private final String platform;
+ @Getter
+ private final String path;
+ @Getter
+ private final FormatConfig formatConfig;
+ @Getter
+ private final String description;
+ @Getter
+ private final Config rawConfig;
+
+ public FSDatasetDescriptor(Config config) {
+ Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY), "Dataset descriptor config must specify platform");
+ this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
+ this.path = PathUtils.getPathWithoutSchemeAndAuthority(new Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
+ this.formatConfig = new FormatConfig(config);
+ this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+ this.rawConfig = config;
+ }
+
+ /**
+ * A helper to determine if the path description of this {@link DatasetDescriptor} is a superset of paths
+ * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
+ * is a glob pattern, we return false.
+ *
+ * @param otherPath a glob pattern that describes a set of paths.
+ * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
+ */
+ public boolean isPathContaining(String otherPath) {
+ if (otherPath == null) {
+ return false;
+ }
+ if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
+ return true;
+ }
+ if (PathUtils.isGlob(new Path(otherPath))) {
+ return false;
+ }
+ GlobPattern globPattern = new GlobPattern(this.getPath());
+ return globPattern.matches(otherPath);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean contains(DatasetDescriptor o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FSDatasetDescriptor)) {
+ return false;
+ }
+ FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+
+ if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+ return false;
+ }
+
+ return getFormatConfig().contains(other.getFormatConfig()) && isPathContaining(other.getPath());
+ }
+
+ /**
+ *
+ * @param o
+ * @return true iff "this" dataset descriptor is compatible with the "other" and the "other" dataset descriptor is
+ * compatible with this dataset descriptor.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FSDatasetDescriptor)) {
+ return false;
+ }
+ FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+ if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+ return false;
+ }
+ return this.getPath().equals(other.getPath()) && this.getFormatConfig().equals(other.getFormatConfig());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), this.getFormatConfig().toString()) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + platform.toLowerCase().hashCode();
+ result = 31 * result + path.hashCode();
+ result = 31 * result + getFormatConfig().hashCode();
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
new file mode 100644
index 0000000..a36182c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A location-independent descriptor of a dataset, which describes a dataset in terms of its physical attributes.
+ * The physical attributes include:
+ * <ul>
+ * <p> Data format (e.g. Avro, CSV, JSON). </p>
+ * <p> Data encoding type (e.g. Gzip, Bzip2, Base64, Deflate). </p>
+ * <p> Encryption properties (e.g. aes_rotating, gpg). </p>
+ * </ul>
+ */
+@Alpha
+public class FormatConfig {
+ @Getter
+ private final String format;
+ @Getter
+ private final String codecType;
+ @Getter
+ private final EncryptionConfig encryptionConfig;
+
+ public FormatConfig(Config config) {
+ this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.codecType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.CODEC_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
+ .empty()));
+ }
+
+ public boolean contains(FormatConfig other) {
+ return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
+ && containsEncryptionConfig(other.getEncryptionConfig());
+ }
+
+ private boolean containsFormat(String otherFormat) {
+ return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
+ || (this.getFormat().equalsIgnoreCase(otherFormat));
+ }
+
+ private boolean containsCodec(String otherCodecType) {
+ return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
+ || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+ }
+
+ private boolean containsEncryptionConfig(EncryptionConfig otherEncryptionConfig) {
+ return this.getEncryptionConfig().contains(otherEncryptionConfig);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof FormatConfig)) {
+ return false;
+ }
+ FormatConfig other = (FormatConfig) o;
+ return this.getFormat().equalsIgnoreCase(other.getFormat()) && this.getCodecType().equalsIgnoreCase(other.getCodecType())
+ && this.getEncryptionConfig().equals(other.getEncryptionConfig());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.getFormat(), this.getCodecType(), this.getEncryptionConfig().toString()) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + codecType.toLowerCase().hashCode();
+ result = 31 * result + format.toLowerCase().hashCode();
+ result = 31 * result + encryptionConfig.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
deleted file mode 100644
index 6f1970c..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import org.apache.gobblin.annotation.Alpha;
-
-
-/**
- * A descriptor interface for HDFS datasets
- */
-@Alpha
-public interface HdfsDatasetDescriptor extends DatasetDescriptor {
- /**
- *
- * @return dataset path.
- */
- public String getPath();
-
- /**
- *
- * @return storage format of the dataset.
- */
- public String getFormat();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
new file mode 100644
index 0000000..daff8ce
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+
+
+/**
+ * A helper class used to maintain additional context associated with each {@link FlowEdge} during path
+ * computation while the edge is explored for its eligibility. The additional context includes the input
+ * {@link DatasetDescriptor} of this edge which is compatible with the previous {@link FlowEdge}'s output
+ * {@link DatasetDescriptor} (where "previous" means the immediately preceding {@link FlowEdge} visited before
+ * the current {@link FlowEdge}), and the corresponding output dataset descriptor of the current {@link FlowEdge}.
+ */
+@AllArgsConstructor
+@EqualsAndHashCode(exclude = {"mergedConfig", "specExecutor"})
+@Getter
+public class FlowEdgeContext {
+ private FlowEdge edge;
+ private DatasetDescriptor inputDatasetDescriptor;
+ private DatasetDescriptor outputDatasetDescriptor;
+ private Config mergedConfig;
+ private SpecExecutor specExecutor;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
new file mode 100644
index 0000000..c642708
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+/**
+ * A class that returns a {@link Dag} of {@link JobExecutionPlan}s from a sequence of edges
+ * represented as a {@link List} of {@link FlowEdgeContext}s.
+ */
+public class FlowGraphPath {
+ private List<FlowEdgeContext> path;
+ private FlowSpec flowSpec;
+ private Long flowExecutionId;
+
+ public FlowGraphPath(List<FlowEdgeContext> path, FlowSpec flowSpec, Long flowExecutionId) {
+ this.path = path;
+ this.flowSpec = flowSpec;
+ this.flowExecutionId = flowExecutionId;
+ }
+
+ public Dag<JobExecutionPlan> asDag()
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>());
+ Iterator<FlowEdgeContext> pathIterator = path.iterator();
+ while (pathIterator.hasNext()) {
+ Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next());
+ flowDag = flowDag.concatenate(flowEdgeDag);
+ }
+ return flowDag;
+ }
+
+ /**
+ * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data
+ * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}.
+ * @param flowEdgeContext an instance of {@link FlowEdgeContext}.
+ * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the {@link FlowEdge}.
+ */
+ private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext)
+ throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
+ DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
+ DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+ Config mergedConfig = flowEdgeContext.getMergedConfig();
+ SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();
+
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+ //Get resolved job configs from the flow template
+ List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);
+ //Iterate over each resolved job config and convert the config to a JobSpec.
+ for (Config resolvedJobConfig : resolvedJobConfigs) {
+ jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId));
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
new file mode 100644
index 0000000..2b4746c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Alpha
+@Slf4j
+public class FlowGraphPathFinder {
+ private static final String SOURCE_PREFIX = "source";
+ private static final String DESTINATION_PREFIX = "destination";
+
+ private FlowGraph flowGraph;
+ private FlowSpec flowSpec;
+ private Config flowConfig;
+
+ private DataNode srcNode;
+ private DataNode destNode;
+
+ private DatasetDescriptor srcDatasetDescriptor;
+ private DatasetDescriptor destDatasetDescriptor;
+
+ //Maintain path of FlowEdges as parent-child map
+ private Map<FlowEdgeContext, FlowEdgeContext> pathMap;
+
+ //Flow Execution Id
+ private Long flowExecutionId;
+
+ /**
+ * Constructor.
+ * @param flowGraph
+ */
+ public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) {
+ this.flowGraph = flowGraph;
+ this.flowSpec = flowSpec;
+ this.flowConfig = flowSpec.getConfig();
+
+ //Get src/dest DataNodes from the flow config
+ String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+ String destNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+ this.srcNode = this.flowGraph.getNode(srcNodeId);
+ Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
+ this.destNode = this.flowGraph.getNode(destNodeId);
+ Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
+
+ //Get src/dest dataset descriptors from the flow config
+ Config srcDatasetDescriptorConfig =
+ flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+ Config destDatasetDescriptorConfig =
+ flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
+ try {
+ Class srcdatasetDescriptorClass =
+ Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+ this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+ .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
+ Class destDatasetDescriptorClass =
+ Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+ this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+ .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
+ * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
+ * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
+ * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
+ */
+ public FlowGraphPath findPath() throws PathFinderException {
+ try {
+ //Initialization of auxiliary data structures used for path computation
+ this.pathMap = new HashMap<>();
+
+ // Generate flow execution id for this compilation
+ this.flowExecutionId = System.currentTimeMillis();
+
+ //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+ // flow graph.
+ // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will
+ // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers.
+ synchronized (this.flowGraph) {
+ //Base condition 1: Source Node or Dest Node is inactive; return null
+ if (!srcNode.isActive() || !destNode.isActive()) {
+ log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(),
+ this.destNode.getId());
+ return null;
+ }
+
+ //Base condition 2: Check if we are already at the target. If so, return an empty path.
+ if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+ return new FlowGraphPath(new ArrayList<>(), flowSpec, flowExecutionId);
+ }
+
+ LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+ edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+ for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+ this.pathMap.put(flowEdgeContext, flowEdgeContext);
+ }
+
+ //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
+ // to the edge E. For each adjacent edge E', do the following:
+ // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
+ // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
+ // edge E'. If yes, add the edge E' to the edge queue.
+ // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
+ while (!edgeQueue.isEmpty()) {
+ FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+
+ DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+ DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+
+ //Are we done?
+ if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+ return constructPath(flowEdgeContext);
+ }
+
+ //Expand the currentNode to its adjacent edges and add them to the queue.
+ List<FlowEdgeContext> nextEdges =
+ getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
+ for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+ //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+ // queue.
+ if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+ edgeQueue.add(childFlowEdgeContext);
+ this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+ }
+ }
+ }
+ }
+ //No path found. Return null.
+ return null;
+ } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
+ throw new PathFinderException(
+ "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + this.destNode.getId(), e);
+ }
+ }
+
+ private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
+ DatasetDescriptor destDatasetDescriptor) {
+ if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
+ * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
+ * @param dataNode
+ * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
+ * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
+ * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
+ */
+ private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
+ DatasetDescriptor destDatasetDescriptor) {
+ List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+ for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+ try {
+ DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
+ //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
+ if (!edgeDestination.isActive() || !flowEdge.isActive()) {
+ continue;
+ }
+
+ boolean foundExecutor = false;
+ //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
+ for (SpecExecutor specExecutor: flowEdge.getExecutors()) {
+ Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
+ flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+ for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
+ DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
+ DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
+ if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+ FlowEdgeContext flowEdgeContext;
+ if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+ //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
+ // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
+ // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
+ flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, specExecutor);
+ } else {
+ //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
+ flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, specExecutor);
+ }
+ if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+ //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
+ // with those of destination dataset descriptor.
+ // In other words, we prioritize edges that perform data transformations as close to the source as possible.
+ prioritizedEdgeList.add(0, flowEdgeContext);
+ } else {
+ prioritizedEdgeList.add(flowEdgeContext);
+ }
+ foundExecutor = true;
+ }
+ }
+ // Found a SpecExecutor. Proceed to the next FlowEdge.
+ // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
+ if (foundExecutor) {
+ break;
+ }
+ }
+ } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
+ | JobTemplate.TemplateException e) {
+ //Skip the edge; and continue
+ log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
+ }
+ }
+ return prioritizedEdgeList;
+ }
+
+ /**
+ * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
+ * <ul>
+ * <p> the user provided flow config </p>
+ * <p> edge specific properties/overrides </p>
+ * <p> spec executor config/overrides </p>
+ * <p> source node config </p>
+ * <p> destination node config </p>
+ * </ul>
+ * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
+ * @param flowEdge An instance of {@link FlowEdge}.
+ * @param specExecutor A {@link SpecExecutor}.
+ * @return the merged config derived as described above.
+ */
+ private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+ throws ExecutionException, InterruptedException {
+ Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
+ Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
+ Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
+ .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+ return mergedConfig;
+ }
+
+ /**
+ *
+ * @param flowEdgeContext of the last {@link FlowEdge} in the path.
+ * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
+ * @throws IOException
+ * @throws SpecNotFoundException
+ * @throws JobTemplate.TemplateException
+ * @throws URISyntaxException
+ */
+ private FlowGraphPath constructPath(FlowEdgeContext flowEdgeContext)
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
+ List<FlowEdgeContext> path = new LinkedList<>();
+ path.add(flowEdgeContext);
+ FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
+ while (true) {
+ path.add(0, this.pathMap.get(currentFlowEdgeContext));
+ currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
+ //Are we at the first edge in the path?
+ if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
+ break;
+ }
+ }
+ FlowGraphPath flowGraphPath = new FlowGraphPath(path, flowSpec, flowExecutionId);
+ return flowGraphPath;
+ }
+
+ public static class PathFinderException extends Exception {
+ public PathFinderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PathFinderException(String message) {
+ super(message);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
new file mode 100644
index 0000000..8b14b10
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+
+/***
+ * Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec}
+ * and its mapping to {@link SpecExecutor}.
+ */
+@Alpha
+@Slf4j
+public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
+ @Getter
+ private FlowGraph flowGraph;
+ private GitFlowGraphMonitor gitFlowGraphMonitor;
+ @Getter
+ private boolean active;
+
+ public MultiHopFlowCompiler(Config config) {
+ this(config, true);
+ }
+
+ public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
+ this(config, Optional.<Logger>absent(), instrumentationEnabled);
+ }
+
+ public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
+ this(config, log, true);
+ }
+
+ public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
+ super(config, log, instrumentationEnabled);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog flowCatalog;
+ try {
+ flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+ }
+ this.flowGraph = new BaseFlowGraph();
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
+ }
+
+ public void setActive(boolean active) {
+ this.active = active;
+ this.gitFlowGraphMonitor.setActive(active);
+ }
+
+ /**
+ * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture
+ * job dependencies.
+ * @param spec
+ * @return
+ */
+ @Override
+ public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+ Preconditions.checkNotNull(spec);
+ Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs");
+
+ long startTime = System.nanoTime();
+ Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
+
+ FlowSpec flowSpec = (FlowSpec) spec;
+ String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
+ String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+ log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
+
+ FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
+ try {
+ //Compute the path from source to destination.
+ FlowGraphPath flowGraphPath = pathFinder.findPath();
+
+ //Convert the path into a Dag of JobExecutionPlans.
+ Dag<JobExecutionPlan> jobExecutionPlanDag;
+ if (flowGraphPath != null) {
+ jobExecutionPlanDag = flowGraphPath.asDag();
+ } else {
+ Instrumented.markMeter(this.flowCompilationFailedMeter);
+ log.info(String.format("No path found from source: %s and destination: %s", source, destination));
+ return null;
+ }
+
+ //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead
+ // of a Map. For now just add all specs into the map.
+ for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan = node.getValue();
+ specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor());
+ }
+ } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException
+ | URISyntaxException e) {
+ Instrumented.markMeter(this.flowCompilationFailedMeter);
+ log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
+ return null;
+ }
+ Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
+ Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+ return specExecutorMap;
+ }
+
+ @Override
+ protected void populateEdgeTemplateMap() {
+ log.warn("No population of templates based on edge happen in this implementation");
+ return;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index 731bc22..4fb9711 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -19,9 +19,9 @@ package org.apache.gobblin.service.modules.flowgraph;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
import org.apache.gobblin.util.ConfigUtils;
import joptsimple.internal.Strings;
@@ -38,7 +38,7 @@ public class BaseDataNode implements DataNode {
@Getter
private String id;
@Getter
- private Config props;
+ private Config rawConfig;
@Getter
private boolean active = true;
@@ -50,8 +50,8 @@ public class BaseDataNode implements DataNode {
if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) {
this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
}
- this.props = nodeProps;
- } catch(Exception e) {
+ this.rawConfig = nodeProps;
+ } catch (Exception e) {
throw new DataNodeCreationException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index fc82cc1..56f6c1b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -44,7 +44,10 @@ import org.apache.gobblin.util.ConfigUtils;
@Alpha
public class BaseFlowEdge implements FlowEdge {
@Getter
- protected List<String> endPoints;
+ protected String src;
+
+ @Getter
+ protected String dest;
@Getter
protected FlowTemplate flowTemplate;
@@ -53,7 +56,7 @@ public class BaseFlowEdge implements FlowEdge {
private List<SpecExecutor> executors;
@Getter
- private Config props;
+ private Config config;
@Getter
private String id;
@@ -63,11 +66,12 @@ public class BaseFlowEdge implements FlowEdge {
//Constructor
public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
- this.endPoints = endPoints;
+ this.src = endPoints.get(0);
+ this.dest = endPoints.get(1);
this.flowTemplate = flowTemplate;
this.executors = executors;
this.active = active;
- this.props = properties;
+ this.config = properties;
this.id = edgeId;
}
@@ -91,7 +95,7 @@ public class BaseFlowEdge implements FlowEdge {
FlowEdge that = (FlowEdge) o;
- if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+ if (!(this.getSrc().equals(that.getSrc())) && ((this.getDest()).equals(that.getDest()))) {
return false;
}
@@ -140,14 +144,14 @@ public class BaseFlowEdge implements FlowEdge {
specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
}
- String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "");
+ String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "");
//Perform basic validation
Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
Preconditions
.checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
Preconditions
- .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty");
+ .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
//Build SpecExecutor from config
@@ -158,7 +162,7 @@ public class BaseFlowEdge implements FlowEdge {
SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
specExecutors.add(executor);
}
- FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
+ FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
} catch (RuntimeException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 783f7ea..edf40cc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -75,8 +75,8 @@ public class BaseFlowGraph implements FlowGraph {
*/
@Override
public synchronized boolean addFlowEdge(FlowEdge edge) {
- String srcNode = edge.getEndPoints().get(0);
- String dstNode = edge.getEndPoints().get(1);
+ String srcNode = edge.getSrc();
+ String dstNode = edge.getDest();
if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
return false;
}
@@ -153,10 +153,10 @@ public class BaseFlowGraph implements FlowGraph {
* if the {@link FlowEdge} is not in the graph, return false.
*/
public synchronized boolean deleteFlowEdge(FlowEdge edge) {
- if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) {
+ if(!dataNodeMap.containsKey(edge.getSrc())) {
return false;
}
- DataNode node = dataNodeMap.get(edge.getEndPoints().get(0));
+ DataNode node = dataNodeMap.get(edge.getSrc());
if(!nodesToEdges.get(node).contains(edge)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 8ae0027..58bbb81 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -87,6 +87,10 @@ public class Dag<T> {
return node.parentNodes;
}
+ public boolean isEmpty() {
+ return this.nodes.isEmpty();
+ }
+
/**
* Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag.
* The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes)
@@ -97,9 +101,12 @@ public class Dag<T> {
* @return the concatenated dag
*/
public Dag<T> concatenate(Dag<T> other) throws IOException {
- if (other == null) {
+ if (other == null || other.isEmpty()) {
return this;
}
+ if (this.isEmpty()) {
+ return other;
+ }
for (DagNode node : this.endNodes) {
this.parentChildMap.put(node, Lists.newArrayList());
for (DagNode otherNode : other.startNodes) {
@@ -108,6 +115,7 @@ public class Dag<T> {
}
this.endNodes = other.endNodes;
}
+ this.nodes.addAll(other.nodes);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
index 4931685..b7a5274 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flowgraph;
import com.typesafe.config.Config;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
/**
@@ -37,7 +36,7 @@ public interface DataNode {
* @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}
* e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node.
*/
- Config getProps();
+ Config getRawConfig();
/**
* @return true if the {@link DataNode} is active
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index e98337d..23e20c8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -22,7 +22,25 @@ package org.apache.gobblin.service.modules.flowgraph;
*/
public class DatasetDescriptorConfigKeys {
//Gobblin Service Dataset Descriptor related keys
+ public static final String FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.input.dataset.descriptor";
+ public static final String FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.output.dataset.descriptor";
+
+ public static final String FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.input.dataset.descriptor";
+ public static final String FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.output.dataset.descriptor";
+
+ public static final String CLASS_KEY = "class";
+ public static final String PLATFORM_KEY = "platform";
public static final String PATH_KEY = "path";
public static final String FORMAT_KEY = "format";
+ public static final String CODEC_KEY = "codec";
public static final String DESCRIPTION_KEY = "description";
+
+ //Dataset encryption related keys
+ public static final String ENCYPTION_PREFIX = "encrypt";
+ public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
+ public static final String ENCRYPTION_KEYSTORE_TYPE_KEY = "keystore_type";
+ public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = "keystore_encoding";
+
+ public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
deleted file mode 100644
index 5899645..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.util.ConfigUtils;
-
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
-
-/**
- * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
- * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
- */
-@Alpha
-public abstract class FileSystemDataNode extends BaseDataNode {
- public static final String FS_URI_KEY = "fs.uri";
- @Getter
- private String fsUri;
-
- /**
- * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
- */
- public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
- super(nodeProps);
- try {
- this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode");
- URI uri = new URI(this.fsUri);
- if(!isUriValid(uri)) {
- throw new IOException("Invalid FS URI " + this.fsUri);
- }
- } catch(Exception e) {
- throw new DataNodeCreationException(e);
- }
- }
-
- public abstract boolean isUriValid(URI fsUri);
- /**
- * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
- */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FileSystemDataNode that = (FileSystemDataNode) o;
-
- return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri());
- }
-
- @Override
- public int hashCode() {
- return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
index fb60d67..497bd5b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
@@ -41,9 +41,15 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
public interface FlowEdge {
/**
*
- * @return the {@link DataNode} ids that are the end points of the edge.
+ * @return the source {@link DataNode} id of the edge.
*/
- List<String> getEndPoints();
+ String getSrc();
+
+ /**
+ *
+ * @return the destination {@link DataNode} id of the edge.
+ */
+ String getDest();
/**
*
@@ -62,7 +68,7 @@ public interface FlowEdge {
* is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}.
* @return the properties of this edge as a {@link Config} object.
*/
- Config getProps();
+ Config getConfig();
/**
* A string uniquely identifying the edge.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index 23f5793..b4aa7bf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -32,6 +32,13 @@ import org.apache.gobblin.annotation.Alpha;
public interface FlowGraph {
/**
+ * Get a {@link DataNode} from the node identifier
+ * @param nodeId {@link DataNode} identifier.
+ * @return the {@link DataNode} object if the node is present in the {@link FlowGraph}.
+ */
+ public DataNode getNode(String nodeId);
+
+ /**
* Add a {@link DataNode} to the {@link FlowGraph}
* @param node {@link DataNode} to be added
* @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index cd4876a..8a49ec0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -22,7 +22,7 @@ public class FlowGraphConfigurationKeys {
public static final String FLOW_EDGE_PREFIX = "flow.edge.";
/**
- * {@link DataNode} configuration keys.
+ * {@link DataNode} related configuration keys.
*/
public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class";
public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode";
@@ -30,7 +30,7 @@ public class FlowGraphConfigurationKeys {
public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
/**
- * {@link FlowEdge} configuration keys.
+ * {@link FlowEdge} related configuration keys.
*/
public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class";
public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory";
@@ -39,7 +39,7 @@ public class FlowGraphConfigurationKeys {
public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id";
public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
- public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
+ public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri";
public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
- public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
+ public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
deleted file mode 100644
index 7bcc18d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import joptsimple.internal.Strings;
-
-
-/**
- * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class HdfsDataNode extends FileSystemDataNode {
- public static final String HDFS_SCHEME = "hdfs";
-
- public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
- super(nodeProps);
- }
-
- /**
- *
- * @param fsUri FileSystem URI
- * @return true if the scheme is "hdfs" and authority is not empty.
- */
- @Override
- public boolean isUriValid(URI fsUri) {
- String scheme = fsUri.getScheme();
- //Check that the scheme is "hdfs"
- if(!scheme.equals(HDFS_SCHEME)) {
- return false;
- }
- //Ensure that the authority is not empty
- if(Strings.isNullOrEmpty(fsUri.getAuthority())) {
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
deleted file mode 100644
index 6dc1aa3..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import com.typesafe.config.Config;
-
-/**
- * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class LocalFSDataNode extends FileSystemDataNode {
- public static final String LOCAL_FS_SCHEME = "file";
-
- public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
- super(nodeProps);
- }
-
- /**
- *
- * @param fsUri FileSystem URI
- * @return true if the scheme of fsUri equals "file"
- */
- @Override
- public boolean isUriValid(URI fsUri) {
- String scheme = fsUri.getScheme();
- if(scheme.equals(LOCAL_FS_SCHEME)) {
- return true;
- }
- return false;
- }
-}