You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/18 06:22:52 UTC
incubator-gobblin git commit: [GOBBLIN-559] Implement FlowGraph as a
concurrent data structure.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master eb2c128dc -> 5a6bfea9f
[GOBBLIN-559] Implement FlowGraph as a concurrent data structure.
Closes #2423 from sv2000/multithread
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5a6bfea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5a6bfea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5a6bfea9
Branch: refs/heads/master
Commit: 5a6bfea9f689360ffb7c5a5c4fef95487b5b4e4c
Parents: eb2c128
Author: sv2000 <su...@gmail.com>
Authored: Fri Aug 17 23:22:54 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Aug 17 23:22:54 2018 -0700
----------------------------------------------------------------------
.../modules/flow/FlowGraphPathFinder.java | 345 -------------------
.../modules/flow/MultiHopFlowCompiler.java | 38 +-
.../modules/flowgraph/BaseFlowGraph.java | 146 +++++---
.../service/modules/flowgraph/FlowGraph.java | 19 +
.../flowgraph/FlowGraphConfigurationKeys.java | 8 +-
.../pathfinder/AbstractPathFinder.java | 259 ++++++++++++++
.../flowgraph/pathfinder/BFSPathFinder.java | 138 ++++++++
.../flowgraph/pathfinder/PathFinder.java | 44 +++
8 files changed, 587 insertions(+), 410 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
deleted file mode 100644
index 59c831d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-
-
-@Alpha
-@Slf4j
-public class FlowGraphPathFinder {
- private static final String SOURCE_PREFIX = "source";
- private static final String DESTINATION_PREFIX = "destination";
-
- private FlowGraph flowGraph;
- private FlowSpec flowSpec;
- private Config flowConfig;
-
- private DataNode srcNode;
- private List<DataNode> destNodes;
-
- private DatasetDescriptor srcDatasetDescriptor;
- private DatasetDescriptor destDatasetDescriptor;
-
- //Maintain path of FlowEdges as parent-child map
- private Map<FlowEdgeContext, FlowEdgeContext> pathMap;
-
- //Flow Execution Id
- private Long flowExecutionId;
-
- /**
- * Constructor.
- * @param flowGraph
- */
- public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) {
- this.flowGraph = flowGraph;
- this.flowSpec = flowSpec;
- this.flowConfig = flowSpec.getConfig();
-
- //Get src/dest DataNodes from the flow config
- String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
-
- List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
- this.srcNode = this.flowGraph.getNode(srcNodeId);
- Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
- for (String destNodeId : destNodeIds) {
- DataNode destNode = this.flowGraph.getNode(destNodeId);
- Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
- if (this.destNodes == null) {
- this.destNodes = new ArrayList<>();
- }
- this.destNodes.add(destNode);
- }
- //Get src/dest dataset descriptors from the flow config
- Config srcDatasetDescriptorConfig =
- flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
- Config destDatasetDescriptorConfig =
- flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
-
- try {
- Class srcdatasetDescriptorClass =
- Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
- this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
- .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
- Class destDatasetDescriptorClass =
- Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
- this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
- .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public FlowGraphPath findPath() throws PathFinderException {
- // Generate flow execution id for this compilation
- this.flowExecutionId = System.currentTimeMillis();
-
- FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
- //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
- // flow graph.
- // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will
- // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers.
- synchronized (this.flowGraph) {
- for (DataNode destNode : this.destNodes) {
- List<FlowEdgeContext> path = findPathBFS(destNode);
- if (path != null) {
- flowGraphPath.addPath(path);
- } else {
- //No path to at least one of the destination nodes.
- return null;
- }
- }
- }
- return flowGraphPath;
- }
-
- /**
- * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
- * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
- * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
- * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
- */
- private List<FlowEdgeContext> findPathBFS(DataNode destNode)
- throws PathFinderException {
- try {
- //Initialization of auxiliary data structures used for path computation
- this.pathMap = new HashMap<>();
-
- //Base condition 1: Source Node or Dest Node is inactive; return null
- if (!srcNode.isActive() || !destNode.isActive()) {
- log.warn("Either source node {} or destination node {} is inactive; skipping path computation.",
- this.srcNode.getId(), destNode.getId());
- return null;
- }
-
- //Base condition 2: Check if we are already at the target. If so, return an empty path.
- if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
- return new ArrayList<>();
- }
-
- LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
- edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
- for (FlowEdgeContext flowEdgeContext : edgeQueue) {
- this.pathMap.put(flowEdgeContext, flowEdgeContext);
- }
-
- //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
- // to the edge E. For each adjacent edge E', do the following:
- // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
- // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
- // edge E'. If yes, add the edge E' to the edge queue.
- // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
- while (!edgeQueue.isEmpty()) {
- FlowEdgeContext flowEdgeContext = edgeQueue.pop();
-
- DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
- DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
-
- //Are we done?
- if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
- return constructPath(flowEdgeContext);
- }
-
- //Expand the currentNode to its adjacent edges and add them to the queue.
- List<FlowEdgeContext> nextEdges =
- getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
- for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
- //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
- // queue.
- if (!this.pathMap.containsKey(childFlowEdgeContext)) {
- edgeQueue.add(childFlowEdgeContext);
- this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
- }
- }
- }
- //No path found. Return null.
- return null;
- } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
- throw new PathFinderException(
- "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode
- .getId(), e);
- }
- }
-
- private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
- DatasetDescriptor destDatasetDescriptor) {
- if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
- return true;
- }
- return false;
- }
-
- /**
- * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
- * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
- * @param dataNode
- * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
- * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
- * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
- */
- private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
- DatasetDescriptor destDatasetDescriptor) {
- List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
- for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
- try {
- DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
- //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
- if (!edgeDestination.isActive() || !flowEdge.isActive()) {
- continue;
- }
-
- boolean foundExecutor = false;
- //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
- for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
- Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
- List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
- flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
- for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
- DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
- DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
- if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
- FlowEdgeContext flowEdgeContext;
- if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
- //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
- // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
- // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
- flowEdgeContext =
- new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig,
- specExecutor);
- } else {
- //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
- flowEdgeContext =
- new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig,
- specExecutor);
- }
- if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
- //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
- // with those of destination dataset descriptor.
- // In other words, we prioritize edges that perform data transformations as close to the source as possible.
- prioritizedEdgeList.add(0, flowEdgeContext);
- } else {
- prioritizedEdgeList.add(flowEdgeContext);
- }
- foundExecutor = true;
- }
- }
- // Found a SpecExecutor. Proceed to the next FlowEdge.
- // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
- if (foundExecutor) {
- break;
- }
- }
- } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
- | JobTemplate.TemplateException e) {
- //Skip the edge; and continue
- log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
- }
- }
- return prioritizedEdgeList;
- }
-
- /**
- * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
- * <ul>
- * <p> the user provided flow config </p>
- * <p> edge specific properties/overrides </p>
- * <p> spec executor config/overrides </p>
- * <p> source node config </p>
- * <p> destination node config </p>
- * </ul>
- * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
- * @param flowEdge An instance of {@link FlowEdge}.
- * @param specExecutor A {@link SpecExecutor}.
- * @return the merged config derived as described above.
- */
- private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
- throws ExecutionException, InterruptedException {
- Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
- Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
- Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
- .withFallback(srcNodeConfig).withFallback(destNodeConfig);
- return mergedConfig;
- }
-
- /**
- *
- * @param flowEdgeContext of the last {@link FlowEdge} in the path.
- * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
- * @throws IOException
- * @throws SpecNotFoundException
- * @throws JobTemplate.TemplateException
- * @throws URISyntaxException
- */
- private List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext)
- throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
- //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
- List<FlowEdgeContext> path = new LinkedList<>();
- path.add(flowEdgeContext);
- FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
- while (true) {
- path.add(0, this.pathMap.get(currentFlowEdgeContext));
- currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
- //Are we at the first edge in the path?
- if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
- break;
- }
- }
- return path;
- }
-
- public static class PathFinderException extends Exception {
- public PathFinderException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public PathFinderException(String message) {
- super(message);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 83951a0..1ab8312 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flow;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -28,7 +27,6 @@ import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.Getter;
@@ -42,15 +40,16 @@ import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
/***
@@ -60,8 +59,9 @@ import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
@Alpha
@Slf4j
public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
+
@Getter
- private FlowGraph flowGraph;
+ private final FlowGraph flowGraph;
private GitFlowGraphMonitor gitFlowGraphMonitor;
@Getter
private boolean active;
@@ -80,9 +80,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ Config templateCatalogCfg = config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
FSFlowCatalog flowCatalog;
try {
flowCatalog = new FSFlowCatalog(templateCatalogCfg);
@@ -118,32 +117,32 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
long startTime = System.nanoTime();
FlowSpec flowSpec = (FlowSpec) spec;
- String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
- String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+ String source = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+ String destination =
+ ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
Dag<JobExecutionPlan> jobExecutionPlanDag;
- FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
try {
//Compute the path from source to destination.
- FlowGraphPath flowGraphPath = pathFinder.findPath();
-
+ FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec);
//Convert the path into a Dag of JobExecutionPlans.
if (flowGraphPath != null) {
jobExecutionPlanDag = flowGraphPath.asDag();
} else {
- Instrumented.markMeter(this.flowCompilationFailedMeter);
+ Instrumented.markMeter(flowCompilationFailedMeter);
log.info(String.format("No path found from source: %s and destination: %s", source, destination));
return new JobExecutionPlanDagFactory().createDag(new ArrayList<>());
}
- } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException e) {
- Instrumented.markMeter(this.flowCompilationFailedMeter);
- log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
+ } catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | ReflectiveOperationException e) {
+ Instrumented.markMeter(flowCompilationFailedMeter);
+ log.error(String
+ .format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination),
+ e);
return null;
}
- Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
- Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
+ Instrumented.markMeter(flowCompilationSuccessFulMeter);
+ Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
return jobExecutionPlanDag;
}
@@ -152,5 +151,4 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
log.warn("No population of templates based on edge happen in this implementation");
return;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index edf40cc..e2b256f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -21,8 +21,15 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import lombok.extern.slf4j.Slf4j;
@@ -32,10 +39,14 @@ import lombok.extern.slf4j.Slf4j;
* <p>dataNodeMap - the mapping from a node identifier to the {@link DataNode} instance</p>
* <p>nodesToEdges - the mapping from each {@link DataNode} to its outgoing {@link FlowEdge}s</p>
* <p>flowEdgeMap - the mapping from a edge label to the {@link FlowEdge} instance</p>
+ *
+ * Read/Write Access to the {@link FlowGraph} is synchronized via a {@link ReentrantReadWriteLock}.
*/
@Alpha
@Slf4j
public class BaseFlowGraph implements FlowGraph {
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
private Map<String, DataNode> dataNodeMap = new HashMap<>();
private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
@@ -58,11 +69,16 @@ public class BaseFlowGraph implements FlowGraph {
* @return true if node is successfully added to the {@link FlowGraph}.
*/
@Override
- public synchronized boolean addDataNode(DataNode node) {
- //Get edges adjacent to the node if it already exists
- Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>());
- this.nodesToEdges.put(node, edges);
- this.dataNodeMap.put(node.getId(), node);
+ public boolean addDataNode(DataNode node) {
+ try {
+ rwLock.writeLock().lock();
+ //Get edges adjacent to the node if it already exists
+ Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>());
+ this.nodesToEdges.put(node, edges);
+ this.dataNodeMap.put(node.getId(), node);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
return true;
}
@@ -74,16 +90,20 @@ public class BaseFlowGraph implements FlowGraph {
* @return true if addition of {@FlowEdge} is successful.
*/
@Override
- public synchronized boolean addFlowEdge(FlowEdge edge) {
- String srcNode = edge.getSrc();
- String dstNode = edge.getDest();
- if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
- return false;
- }
- DataNode dataNode = getNode(srcNode);
- if(dataNode != null) {
+ public boolean addFlowEdge(FlowEdge edge) {
+ try {
+ rwLock.writeLock().lock();
+ String srcNode = edge.getSrc();
+ String dstNode = edge.getDest();
+ if (!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
+ return false;
+ }
+ DataNode dataNode = getNode(srcNode);
+ if (dataNode == null) {
+ return false;
+ }
Set<FlowEdge> adjacentEdges = this.nodesToEdges.get(dataNode);
- if(!adjacentEdges.add(edge)) {
+ if (!adjacentEdges.add(edge)) {
adjacentEdges.remove(edge);
adjacentEdges.add(edge);
}
@@ -91,8 +111,8 @@ public class BaseFlowGraph implements FlowGraph {
String edgeId = edge.getId();
this.flowEdgeMap.put(edgeId, edge);
return true;
- } else {
- return false;
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -102,11 +122,12 @@ public class BaseFlowGraph implements FlowGraph {
* @return true if {@link DataNode} is successfully deleted.
*/
@Override
- public synchronized boolean deleteDataNode(String nodeId) {
- if(this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId))) {
- return true;
- } else {
- return false;
+ public boolean deleteDataNode(String nodeId) {
+ try {
+ rwLock.writeLock().lock();
+ return this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId));
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -115,20 +136,25 @@ public class BaseFlowGraph implements FlowGraph {
* @param node to be deleted.
* @return true if {@link DataNode} is successfully deleted.
*/
- public synchronized boolean deleteDataNode(DataNode node) {
- if(dataNodeMap.containsKey(node.getId())) {
+ public boolean deleteDataNode(DataNode node) {
+ try {
+ rwLock.writeLock().lock();
+ if (!dataNodeMap.containsKey(node.getId())) {
+ return false;
+ }
//Delete node from dataNodeMap
dataNodeMap.remove(node.getId());
//Delete all the edges adjacent to the node. First, delete edges from flowEdgeMap and next, remove the edges
// from nodesToEdges
- for(FlowEdge edge: nodesToEdges.get(node)) {
+ for (FlowEdge edge : nodesToEdges.get(node)) {
flowEdgeMap.remove(edge.getId());
}
nodesToEdges.remove(node);
return true;
- } else {
- return false;
+
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -138,11 +164,12 @@ public class BaseFlowGraph implements FlowGraph {
* @return true if {@link FlowEdge} is successfully deleted.
*/
@Override
- public synchronized boolean deleteFlowEdge(String edgeId) {
- if(flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId))) {
- return true;
- } else {
- return false;
+ public boolean deleteFlowEdge(String edgeId) {
+ try {
+ rwLock.writeLock().lock();
+ return flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId));
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -152,17 +179,22 @@ public class BaseFlowGraph implements FlowGraph {
* @return true if {@link FlowEdge} is successfully deleted. If the source of a {@link FlowEdge} does not exist or
* if the {@link FlowEdge} is not in the graph, return false.
*/
- public synchronized boolean deleteFlowEdge(FlowEdge edge) {
- if(!dataNodeMap.containsKey(edge.getSrc())) {
- return false;
- }
- DataNode node = dataNodeMap.get(edge.getSrc());
- if(!nodesToEdges.get(node).contains(edge)) {
- return false;
+ public boolean deleteFlowEdge(FlowEdge edge) {
+ try {
+ rwLock.writeLock().lock();
+ if (!dataNodeMap.containsKey(edge.getSrc())) {
+ return false;
+ }
+ DataNode node = dataNodeMap.get(edge.getSrc());
+ if (!nodesToEdges.get(node).contains(edge)) {
+ return false;
+ }
+ this.nodesToEdges.get(node).remove(edge);
+ this.flowEdgeMap.remove(edge.getId());
+ return true;
+ } finally {
+ rwLock.writeLock().unlock();
}
- this.nodesToEdges.get(node).remove(edge);
- this.flowEdgeMap.remove(edge.getId());
- return true;
}
/**
@@ -172,8 +204,13 @@ public class BaseFlowGraph implements FlowGraph {
*/
@Override
public Set<FlowEdge> getEdges(String nodeId) {
- DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null);
- return getEdges(dataNode);
+ try {
+ rwLock.readLock().lock();
+ DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null);
+ return getEdges(dataNode);
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
/**
@@ -183,7 +220,28 @@ public class BaseFlowGraph implements FlowGraph {
*/
@Override
public Set<FlowEdge> getEdges(DataNode node) {
- return (node != null)? this.nodesToEdges.getOrDefault(node, null) : null;
+ try {
+ rwLock.readLock().lock();
+ return (node != null) ? this.nodesToEdges.getOrDefault(node, null) : null;
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
+ /**{@inheritDoc}**/
+ @Override
+ public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException {
+ try {
+ rwLock.readLock().lock();
+ //Instantiate a PathFinder.
+ Class pathFinderClass = Class.forName(ConfigUtils
+ .getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS));
+ PathFinder pathFinder =
+ (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec);
+ return pathFinder.findPath();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index b4aa7bf..1d8eb23 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -19,7 +19,13 @@ package org.apache.gobblin.service.modules.flowgraph;
import java.util.Collection;
+import com.typesafe.config.Config;
+
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
/**
@@ -80,4 +86,17 @@ public interface FlowGraph {
* @return a collection of edges adjacent to the {@link DataNode}
*/
public Collection<FlowEdge> getEdges(DataNode node);
+
+ /**
+ * A method that takes a {@link FlowSpec} containing the source and destination {@link DataNode}s, as well as the
+ * source and target {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}s, and returns a sequence
+ * of fully resolved {@link org.apache.gobblin.runtime.api.JobSpec}s that will move the source dataset
+ * from the source datanode, perform any necessary transformations and land the dataset at the destination node
+ * in the format described by the target {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}.
+ *
+ * @param flowSpec a {@link org.apache.gobblin.runtime.api.Spec} containing a high-level description of input flow.
+ * @return an instance of {@link FlowGraphPath} that encapsulates a sequence of {@link org.apache.gobblin.runtime.api.JobSpec}s
+ * satisfying flowSpec.
+ */
+ public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index 8a49ec0..5a43a83 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -20,7 +20,7 @@ package org.apache.gobblin.service.modules.flowgraph;
public class FlowGraphConfigurationKeys {
public static final String DATA_NODE_PREFIX = "data.node.";
public static final String FLOW_EDGE_PREFIX = "flow.edge.";
-
+ public static final String FLOW_GRAPH_PREFIX = "flow.graph.";
/**
* {@link DataNode} related configuration keys.
*/
@@ -42,4 +42,10 @@ public class FlowGraphConfigurationKeys {
public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri";
public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class";
+
+ /**
+ * {@link org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder} related configuration keys.
+ */
+ public static final String FLOW_GRAPH_PATH_FINDER_CLASS = FLOW_GRAPH_PREFIX + "pathfinder.class";
+ public static final String DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS = "org.apache.gobblin.service.modules.flowgraph.pathfinder.BFSPathFinder";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
new file mode 100644
index 0000000..9901c2f
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Alpha
+@Slf4j
+public abstract class AbstractPathFinder implements PathFinder {
+ private static final String SOURCE_PREFIX = "source";
+ private static final String DESTINATION_PREFIX = "destination";
+
+ protected FlowGraph flowGraph;
+ protected FlowSpec flowSpec;
+ protected Config flowConfig;
+
+ protected DataNode srcNode;
+ protected List<DataNode> destNodes;
+
+ protected DatasetDescriptor srcDatasetDescriptor;
+ protected DatasetDescriptor destDatasetDescriptor;
+
+ //Maintain path of FlowEdges as parent-child map
+ protected Map<FlowEdgeContext, FlowEdgeContext> pathMap;
+
+ //Flow Execution Id
+ protected Long flowExecutionId;
+
+ public AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
+ throws ReflectiveOperationException {
+ this.flowGraph = flowGraph;
+ this.flowSpec = flowSpec;
+ this.flowConfig = flowSpec.getConfig();
+
+ //Get src/dest DataNodes from the flow config
+ String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+
+ List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+ this.srcNode = this.flowGraph.getNode(srcNodeId);
+ Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
+ for (String destNodeId : destNodeIds) {
+ DataNode destNode = this.flowGraph.getNode(destNodeId);
+ Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
+ if (this.destNodes == null) {
+ this.destNodes = new ArrayList<>();
+ }
+ this.destNodes.add(destNode);
+ }
+ //Get src/dest dataset descriptors from the flow config
+ Config srcDatasetDescriptorConfig =
+ flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+ Config destDatasetDescriptorConfig =
+ flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
+ Class srcdatasetDescriptorClass =
+ Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+ this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+ .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
+ Class destDatasetDescriptorClass =
+ Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+ this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+ .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
+ }
+
+ protected boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
+ DatasetDescriptor destDatasetDescriptor) {
+ if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
+ * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
+ * @param dataNode
+ * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
+ * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
+ * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
+ */
+ protected List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
+ DatasetDescriptor destDatasetDescriptor) {
+ List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+ for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+ try {
+ DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
+ //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
+ if (!edgeDestination.isActive() || !flowEdge.isActive()) {
+ continue;
+ }
+
+ boolean foundExecutor = false;
+ //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
+ for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
+ Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
+ flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+ for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
+ DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
+ DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
+ if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+ FlowEdgeContext flowEdgeContext;
+ if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+ //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
+ // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
+ // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
+ flowEdgeContext =
+ new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig,
+ specExecutor);
+ } else {
+ //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
+ flowEdgeContext =
+ new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig,
+ specExecutor);
+ }
+ if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+ //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
+ // with those of destination dataset descriptor.
+ // In other words, we prioritize edges that perform data transformations as close to the source as possible.
+ prioritizedEdgeList.add(0, flowEdgeContext);
+ } else {
+ prioritizedEdgeList.add(flowEdgeContext);
+ }
+ foundExecutor = true;
+ }
+ }
+ // Found a SpecExecutor. Proceed to the next FlowEdge.
+ // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
+ if (foundExecutor) {
+ break;
+ }
+ }
+ } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
+ | JobTemplate.TemplateException e) {
+ //Skip the edge; and continue
+ log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
+ }
+ }
+ return prioritizedEdgeList;
+ }
+
+ /**
+ * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
+ * <ul>
+ * <p> the user provided flow config </p>
+ * <p> edge specific properties/overrides </p>
+ * <p> spec executor config/overrides </p>
+ * <p> source node config </p>
+ * <p> destination node config </p>
+ * </ul>
+ * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
+ * @param flowEdge An instance of {@link FlowEdge}.
+ * @param specExecutor A {@link SpecExecutor}.
+ * @return the merged config derived as described above.
+ */
+ private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+ throws ExecutionException, InterruptedException {
+ Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
+ Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
+ Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
+ .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+ return mergedConfig;
+ }
+
+ /**
+ *
+ * @param flowEdgeContext of the last {@link FlowEdge} in the path.
+ * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
+ * @throws IOException
+ * @throws SpecNotFoundException
+ * @throws JobTemplate.TemplateException
+ * @throws URISyntaxException
+ */
+ protected List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext)
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
+ List<FlowEdgeContext> path = new LinkedList<>();
+ path.add(flowEdgeContext);
+ FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
+ while (true) {
+ path.add(0, this.pathMap.get(currentFlowEdgeContext));
+ currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
+ //Are we at the first edge in the path?
+ if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
+ break;
+ }
+ }
+ return path;
+ }
+
+ @Override
+ public FlowGraphPath findPath() throws PathFinderException {
+ // Generate flow execution id for this compilation
+ this.flowExecutionId = System.currentTimeMillis();
+
+ FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
+ //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+ // flow graph.
+ for (DataNode destNode : this.destNodes) {
+ List<FlowEdgeContext> path = findPathUnicast(destNode);
+ if (path != null) {
+ flowGraphPath.addPath(path);
+ } else {
+ //No path to at least one of the destination nodes.
+ return null;
+ }
+ }
+ return flowGraphPath;
+ }
+
+ public abstract List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws PathFinderException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
new file mode 100644
index 0000000..02dd67b
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+
+
+/**
+ * An implementation of {@link PathFinder} that assumes an unweighted {@link FlowGraph} and computes the
+ * shortest path using a variant of the BFS path-finding algorithm. This implementation has two key differences from the
+ * traditional BFS implementations:
+ * <ul>
+ * <p> the input graph is a multi-graph i.e. there could be multiple edges between each pair of nodes, and </p>
+ * <p> each edge has a label associated with it. In our case, the label corresponds to the set of input/output
+ * dataset descriptors that are accepted by the edge.</p>
+ * </ul>
+ * Given these differences, we maintain:
+ * <p> a {@link HashMap} of list of visited edges, as opposed to list of visited
+ * vertices as in the case of traditional BFS, and </p>
+ * <p> for each edge, we maintain additional state that includes the input/output dataset descriptor
+ * associated with the particular visitation of that edge. </p>
+ * This additional information allows us to accurately mark edges as visited and guarantee termination of the algorithm.
+ */
+@Alpha
+@Slf4j
+public class BFSPathFinder extends AbstractPathFinder {
+ /**
+ * Constructor.
+ * @param flowGraph
+ */
+ public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException {
+ super(flowGraph, flowSpec);
+ }
+
+ /**
+ * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
+ * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
+ * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
+ * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
+ */
+ public List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws PathFinderException {
+ try {
+ //Initialization of auxiliary data structures used for path computation
+ this.pathMap = new HashMap<>();
+
+ // Generate flow execution id for this compilation
+ this.flowExecutionId = System.currentTimeMillis();
+
+ //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+ // flow graph.
+ //Base condition 1: Source Node or Dest Node is inactive; return null
+ if (!srcNode.isActive() || !destNode.isActive()) {
+ log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(),
+ destNode.getId());
+ return null;
+ }
+
+ //Base condition 2: Check if we are already at the target. If so, return an empty path.
+ if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+ return new ArrayList<>();
+ }
+
+ LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+ edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+ for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+ this.pathMap.put(flowEdgeContext, flowEdgeContext);
+ }
+
+ //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
+ // to the edge E. For each adjacent edge E', do the following:
+ // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
+ // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
+ // edge E'. If yes, add the edge E' to the edge queue.
+ // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
+ while (!edgeQueue.isEmpty()) {
+ FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+
+ DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+ DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+
+ //Are we done?
+ if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+ return constructPath(flowEdgeContext);
+ }
+
+ //Expand the currentNode to its adjacent edges and add them to the queue.
+ List<FlowEdgeContext> nextEdges =
+ getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
+ for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+ //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+ // queue.
+ if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+ edgeQueue.add(childFlowEdgeContext);
+ this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+ }
+ }
+ }
+ //No path found. Return null.
+ return null;
+ } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
+ throw new PathFinder.PathFinderException(
+ "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode.getId(), e);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java
new file mode 100644
index 0000000..982d7f6
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+
+
+/**
+ * An interface for computing a path in a {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph}. Each
+ * implementation of {@link PathFinder} implements a specific path finding algorithm such as Breadth-First Search (BFS),
+ * Dijkstra's shortest-path algorithm etc.
+ */
+@Alpha
+public interface PathFinder {
+
+ public FlowGraphPath findPath() throws PathFinderException;
+
+ public static class PathFinderException extends Exception {
+ public PathFinderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PathFinderException(String message) {
+ super(message);
+ }
+ }
+
+}