You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/04/04 01:24:43 UTC
samza git commit: Samza 1186: Rename Processor to Job
Repository: samza
Updated Branches:
refs/heads/master 311f9d12e -> b70ee983b
Samza 1186: Rename Processor to Job
Now we have the top level Samza application, and each stage is called a job, the previous introduced "processor" naming should be renamed as Job. This includes renaming PrcessorGraph to JobGraph, and ProcessorNode to JobNode, etc.
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Reviewers: Jake Maes <jm...@apache.org>
Closes #109 from xinyuiscool/SAMZA-1186
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b70ee983
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b70ee983
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b70ee983
Branch: refs/heads/master
Commit: b70ee983b7af4f5d47e1f7368d5f42e08937d16f
Parents: 311f9d1
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Mon Apr 3 18:24:33 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Apr 3 18:24:33 2017 -0700
----------------------------------------------------------------------
.../apache/samza/runtime/ApplicationRunner.java | 1 +
.../samza/execution/ExecutionPlanner.java | 68 ++--
.../org/apache/samza/execution/JobGraph.java | 368 +++++++++++++++++++
.../org/apache/samza/execution/JobNode.java | 132 +++++++
.../apache/samza/execution/ProcessorGraph.java | 359 ------------------
.../apache/samza/execution/ProcessorNode.java | 116 ------
.../org/apache/samza/execution/StreamEdge.java | 18 +-
.../samza/operators/spec/OperatorSpec.java | 6 +
.../samza/runtime/RemoteApplicationRunner.java | 38 +-
.../samza/execution/TestExecutionPlanner.java | 44 +--
.../apache/samza/execution/TestJobGraph.java | 273 ++++++++++++++
.../samza/execution/TestProcessorGraph.java | 230 ------------
12 files changed, 865 insertions(+), 788 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index d4f5b00..e4e24b4 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -98,6 +98,7 @@ public abstract class ApplicationRunner {
* Returns {@link ApplicationStatus#Running} if any of the jobs are running.
*
* @param streamApp the user-defined {@link StreamApplication} object
+ * @return the status of the application
*/
public abstract ApplicationStatus status(StreamApplication streamApp);
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index f9e44cf..47deecd 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -57,26 +57,23 @@ public class ExecutionPlanner {
this.streamManager = streamManager;
}
- public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
- // create physical processors based on stream graph
- ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
+ public JobGraph plan(StreamGraph streamGraph) throws Exception {
+ // create physical job graph based on stream graph
+ JobGraph jobGraph = createJobGraph(streamGraph);
- if (!processorGraph.getIntermediateStreams().isEmpty()) {
+ if (!jobGraph.getIntermediateStreams().isEmpty()) {
// figure out the partitions for internal streams
- calculatePartitions(streamGraph, processorGraph);
+ calculatePartitions(streamGraph, jobGraph);
}
- return processorGraph;
+ return jobGraph;
}
/**
* Create the physical graph from StreamGraph
*/
- /* package private */ ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
- // For this phase, we are going to create a processor for the whole dag
- String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
-
- ProcessorGraph processorGraph = new ProcessorGraph(config);
+ /* package private */ JobGraph createJobGraph(StreamGraph streamGraph) {
+ JobGraph jobGraph = new JobGraph(streamGraph, config);
Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
@@ -84,46 +81,51 @@ public class ExecutionPlanner {
sourceStreams.removeAll(intStreams);
sinkStreams.removeAll(intStreams);
+ // For this phase, we have a single job node for the whole dag
+ String jobName = config.get(JobConfig.JOB_NAME());
+ String jobId = config.get(JobConfig.JOB_ID(), "1");
+ JobNode node = jobGraph.getOrCreateNode(jobName, jobId);
+
// add sources
- sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
+ sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
// add sinks
- sinkStreams.forEach(spec -> processorGraph.addSink(spec, processorId));
+ sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));
// add intermediate streams
- intStreams.forEach(spec -> processorGraph.addIntermediateStream(spec, processorId, processorId));
+ intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
- processorGraph.validate();
+ jobGraph.validate();
- return processorGraph;
+ return jobGraph;
}
/**
* Figure out the number of partitions of all streams
*/
- /* package private */ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+ /* package private */ void calculatePartitions(StreamGraph streamGraph, JobGraph jobGraph) {
// fetch the external streams partition info
- updateExistingPartitions(processorGraph, streamManager);
+ updateExistingPartitions(jobGraph, streamManager);
// calculate the partitions for the input streams of join operators
- calculateJoinInputPartitions(streamGraph, processorGraph);
+ calculateJoinInputPartitions(streamGraph, jobGraph);
// calculate the partitions for the rest of intermediate streams
- calculateIntStreamPartitions(processorGraph, config);
+ calculateIntStreamPartitions(jobGraph, config);
// validate all the partitions are assigned
- validatePartitions(processorGraph);
+ validatePartitions(jobGraph);
}
/**
* Fetch the partitions of source/sink streams and update the StreamEdges.
- * @param processorGraph ProcessorGraph
+ * @param jobGraph {@link JobGraph}
* @param streamManager the {@StreamManager} to interface with the streams.
*/
- /* package private */ static void updateExistingPartitions(ProcessorGraph processorGraph, StreamManager streamManager) {
+ /* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
Set<StreamEdge> existingStreams = new HashSet<>();
- existingStreams.addAll(processorGraph.getSources());
- existingStreams.addAll(processorGraph.getSinks());
+ existingStreams.addAll(jobGraph.getSources());
+ existingStreams.addAll(jobGraph.getSinks());
Multimap<String, StreamEdge> systemToStreamEdges = HashMultimap.create();
// group the StreamEdge(s) based on the system name
@@ -150,7 +152,7 @@ public class ExecutionPlanner {
/**
* Calculate the partitions for the input streams of join operators
*/
- /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+ /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, JobGraph jobGraph) {
// mapping from a source stream to all join specs reachable from it
Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
// reverse mapping of the above
@@ -165,7 +167,7 @@ public class ExecutionPlanner {
Set<OperatorSpec> visited = new HashSet<>();
streamGraph.getInStreams().entrySet().forEach(entry -> {
- StreamEdge streamEdge = processorGraph.getOrCreateEdge(entry.getKey());
+ StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
outputStreamToJoinSpec, joinQ, visited);
@@ -248,24 +250,24 @@ public class ExecutionPlanner {
}
}
- private static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) {
+ private static void calculateIntStreamPartitions(JobGraph jobGraph, Config config) {
int partitions = config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), StreamEdge.PARTITIONS_UNKNOWN);
if (partitions < 0) {
// use the following simple algo to figure out the partitions
// partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
- int maxInPartitions = maxPartition(processorGraph.getSources());
- int maxOutPartitions = maxPartition(processorGraph.getSinks());
+ int maxInPartitions = maxPartition(jobGraph.getSources());
+ int maxOutPartitions = maxPartition(jobGraph.getSinks());
partitions = Math.max(maxInPartitions, maxOutPartitions);
}
- for (StreamEdge edge : processorGraph.getIntermediateStreams()) {
+ for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
if (edge.getPartitionCount() <= 0) {
edge.setPartitionCount(partitions);
}
}
}
- private static void validatePartitions(ProcessorGraph processorGraph) {
- for (StreamEdge edge : processorGraph.getIntermediateStreams()) {
+ private static void validatePartitions(JobGraph jobGraph) {
+ for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
if (edge.getPartitionCount() <= 0) {
throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getFormattedSystemStream()));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
new file mode 100644
index 0000000..5c9f037
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The JobGraph is the physical execution graph for a multi-stage Samza application.
+ * It contains the topology of jobs connected with source/sink/intermediate streams.
+ * High level APIs are transformed into JobGraph for planning, validation and execution.
+ * Source/sink streams are external streams while intermediate streams are created and managed by Samza.
+ * Note that intermediate streams are both the input and output of a JobNode in JobGraph.
+ * So the graph may have cycles and it's not a DAG.
+ */
+public class JobGraph {
+ private static final Logger log = LoggerFactory.getLogger(JobGraph.class);
+
+ private final Map<String, JobNode> nodes = new HashMap<>();
+ private final Map<String, StreamEdge> edges = new HashMap<>();
+ private final Set<StreamEdge> sources = new HashSet<>();
+ private final Set<StreamEdge> sinks = new HashSet<>();
+ private final Set<StreamEdge> intermediateStreams = new HashSet<>();
+ private final Config config;
+ private final StreamGraph streamGraph;
+
+ /**
+ * The JobGraph is only constructed by the {@link ExecutionPlanner}.
+ * @param config Config
+ */
+ /* package private */ JobGraph(StreamGraph streamGraph, Config config) {
+ this.streamGraph = streamGraph;
+ this.config = config;
+ }
+
+ /**
+ * Add a source stream to a {@link JobNode}
+ * @param input source stream
+ * @param node the job node that consumes from the source
+ */
+ /* package private */ void addSource(StreamSpec input, JobNode node) {
+ StreamEdge edge = getOrCreateEdge(input);
+ edge.addTargetNode(node);
+ node.addInEdge(edge);
+ sources.add(edge);
+ }
+
+ /**
+ * Add a sink stream to a {@link JobNode}
+ * @param output sink stream
+ * @param node the job node that outputs to the sink
+ */
+ /* package private */ void addSink(StreamSpec output, JobNode node) {
+ StreamEdge edge = getOrCreateEdge(output);
+ edge.addSourceNode(node);
+ node.addOutEdge(edge);
+ sinks.add(edge);
+ }
+
+ /**
+ * Add an intermediate stream from source to target {@link JobNode}
+ * @param streamSpec intermediate stream
+ * @param from the source node
+ * @param to the target node
+ */
+ /* package private */ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
+ StreamEdge edge = getOrCreateEdge(streamSpec);
+ edge.addSourceNode(from);
+ edge.addTargetNode(to);
+ from.addOutEdge(edge);
+ to.addInEdge(edge);
+ intermediateStreams.add(edge);
+ }
+
+ /**
+ * Get the {@link JobNode}. Create one if it does not exist.
+ * @param jobName name of the job
+ * @param jobId id of the job
+ * @return
+ */
+ /* package private */JobNode getOrCreateNode(String jobName, String jobId) {
+ String nodeId = JobNode.createId(jobName, jobId);
+ JobNode node = nodes.get(nodeId);
+ if (node == null) {
+ node = new JobNode(jobName, jobId, config);
+ nodes.put(nodeId, node);
+ }
+ return node;
+ }
+
+ /**
+ * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
+ * @param streamSpec spec of the StreamEdge
+ * @return stream edge
+ */
+ /* package private */StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
+ String streamId = streamSpec.getId();
+ StreamEdge edge = edges.get(streamId);
+ if (edge == null) {
+ edge = new StreamEdge(streamSpec);
+ edges.put(streamId, edge);
+ }
+ return edge;
+ }
+
+ /**
+ * Returns the job nodes to be executed in the topological order
+ * @return unmodifiable list of {@link JobNode}
+ */
+ public List<JobNode> getJobNodes() {
+ List<JobNode> sortedNodes = topologicalSort();
+ return Collections.unmodifiableList(sortedNodes);
+ }
+
+ /**
+ * Returns the source streams in the graph
+ * @return unmodifiable set of {@link StreamEdge}
+ */
+ public Set<StreamEdge> getSources() {
+ return Collections.unmodifiableSet(sources);
+ }
+
+ /**
+ * Return the sink streams in the graph
+ * @return unmodifiable set of {@link StreamEdge}
+ */
+ public Set<StreamEdge> getSinks() {
+ return Collections.unmodifiableSet(sinks);
+ }
+
+ /**
+ * Return the intermediate streams in the graph
+ * @return unmodifiable set of {@link StreamEdge}
+ */
+ public Set<StreamEdge> getIntermediateStreams() {
+ return Collections.unmodifiableSet(intermediateStreams);
+ }
+
+ /**
+ * Return the {@link StreamGraph}
+ * @return {@link StreamGraph}
+ */
+ public StreamGraph getStreamGraph() {
+ return this.streamGraph;
+ }
+
+
+ /**
+ * Validate the graph has the correct topology, meaning the sources are coming from external streams,
+ * sinks are going to external streams, and the nodes are connected with intermediate streams.
+ * Also validate all the nodes are reachable from the sources.
+ */
+ public void validate() {
+ validateSources();
+ validateSinks();
+ validateInternalStreams();
+ validateReachability();
+ }
+
+ /**
+ * Validate the sources should have indegree being 0 and outdegree greater than 0
+ */
+ private void validateSources() {
+ sources.forEach(edge -> {
+ if (!edge.getSourceNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
+ }
+ if (edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate the sinks should have outdegree being 0 and indegree greater than 0
+ */
+ private void validateSinks() {
+ sinks.forEach(edge -> {
+ if (!edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
+ }
+ if (edge.getSourceNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate the internal streams should have both indegree and outdegree greater than 0
+ */
+ private void validateInternalStreams() {
+ Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
+ internalEdges.removeAll(sources);
+ internalEdges.removeAll(sinks);
+
+ internalEdges.forEach(edge -> {
+ if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate all nodes are reachable by sources.
+ */
+ private void validateReachability() {
+ // validate all nodes are reachable from the sources
+ final Set<JobNode> reachable = findReachable();
+ if (reachable.size() != nodes.size()) {
+ Set<JobNode> unreachable = new HashSet<>(nodes.values());
+ unreachable.removeAll(reachable);
+ throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.",
+ String.join(", ", unreachable.stream().map(JobNode::getId).collect(Collectors.toList()))));
+ }
+ }
+
+ /**
+ * Find the reachable set of nodes using BFS.
+ * @return reachable set of {@link JobNode}
+ */
+ /* package private */ Set<JobNode> findReachable() {
+ Queue<JobNode> queue = new ArrayDeque<>();
+ Set<JobNode> visited = new HashSet<>();
+
+ sources.forEach(source -> {
+ List<JobNode> next = source.getTargetNodes();
+ queue.addAll(next);
+ visited.addAll(next);
+ });
+
+ while (!queue.isEmpty()) {
+ JobNode node = queue.poll();
+ node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+ if (!visited.contains(target)) {
+ visited.add(target);
+ queue.offer(target);
+ }
+ });
+ }
+
+ return visited;
+ }
+
+ /**
+ * An variation of Kahn's algorithm of topological sorting.
+ * This algorithm also takes account of the simple loops in the graph
+ * @return topologically sorted {@link JobNode}s
+ */
+ /* package private */ List<JobNode> topologicalSort() {
+ Collection<JobNode> pnodes = nodes.values();
+ if (pnodes.size() == 1) {
+ return new ArrayList<>(pnodes);
+ }
+
+ Queue<JobNode> q = new ArrayDeque<>();
+ Map<String, Long> indegree = new HashMap<>();
+ Set<JobNode> visited = new HashSet<>();
+ pnodes.forEach(node -> {
+ String nid = node.getId();
+ //only count the degrees of intermediate streams
+ long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+ indegree.put(nid, degree);
+
+ if (degree == 0L) {
+ // start from the nodes that has no intermediate input streams, so it only consumes from sources
+ q.add(node);
+ visited.add(node);
+ }
+ });
+
+ List<JobNode> sortedNodes = new ArrayList<>();
+ Set<JobNode> reachable = new HashSet<>();
+ while (sortedNodes.size() < pnodes.size()) {
+ // Here we use indegree-based approach to implment Kahn's algorithm for topological sort
+ // This approach will not change the graph itself during computation.
+ //
+ // The algorithm works as:
+ // 1. start with nodes with no incoming edges (in degree being 0) and inserted into the list
+ // 2. remove the edge from any node in the list to its connected nodes by changing the indegree of the connected nodes.
+ // 3. add any new nodes with ingree being 0
+ // 4. loop 1-3 until no more nodes with indegree 0
+ //
+ while (!q.isEmpty()) {
+ JobNode node = q.poll();
+ sortedNodes.add(node);
+ node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+ String nid = n.getId();
+ Long degree = indegree.get(nid) - 1;
+ indegree.put(nid, degree);
+ if (degree == 0L && !visited.contains(n)) {
+ q.add(n);
+ visited.add(n);
+ }
+ reachable.add(n);
+ });
+ }
+
+ if (sortedNodes.size() < pnodes.size()) {
+ // The remaining nodes have cycles
+ // use the following approach to break the cycles
+ // start from the nodes that are reachable from previous traverse
+ reachable.removeAll(sortedNodes);
+ if (!reachable.isEmpty()) {
+ //find out the nodes with minimal input edge
+ long min = Long.MAX_VALUE;
+ JobNode minNode = null;
+ for (JobNode node : reachable) {
+ Long degree = indegree.get(node.getId());
+ if (degree < min) {
+ min = degree;
+ minNode = node;
+ }
+ }
+ // start from the node with minimal input edge again
+ q.add(minNode);
+ visited.add(minNode);
+ } else {
+ // all the remaining nodes should be reachable from sources
+ // start from sources again to find the next node that hasn't been visited
+ JobNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+ .filter(node -> !visited.contains(node))
+ .findAny().get();
+ q.add(nextNode);
+ visited.add(nextNode);
+ }
+ }
+ }
+
+ return sortedNodes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
new file mode 100644
index 0000000..c47a69c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
+ * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
+ * A JobNode contains the input/output, and the configs for physical execution.
+ */
+public class JobNode {
+ private static final Logger log = LoggerFactory.getLogger(JobNode.class);
+ private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
+
+ private final String jobName;
+ private final String jobId;
+ private final String id;
+ private final List<StreamEdge> inEdges = new ArrayList<>();
+ private final List<StreamEdge> outEdges = new ArrayList<>();
+ private final Config config;
+
+ JobNode(String jobName, String jobId, Config config) {
+ this.jobName = jobName;
+ this.jobId = jobId;
+ this.id = createId(jobName, jobId);
+ this.config = config;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ void addInEdge(StreamEdge in) {
+ inEdges.add(in);
+ }
+
+ void addOutEdge(StreamEdge out) {
+ outEdges.add(out);
+ }
+
+ List<StreamEdge> getInEdges() {
+ return inEdges;
+ }
+
+ List<StreamEdge> getOutEdges() {
+ return outEdges;
+ }
+
+ public Config generateConfig() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put(JobConfig.JOB_NAME(), jobName);
+
+ List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+ configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+ log.info("Job {} has generated configs {}", jobName, configs);
+
+ String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
+ // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
+ return Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix));
+ }
+
+ /**
+ * This function extract the subset of configs from the full config, and use it to override the generated configs
+ * from the job.
+ * @param fullConfig full config
+ * @param generatedConfig config generated for the job
+ * @param configPrefix prefix to extract the subset of the config overrides
+ * @return config that merges the generated configs and overrides
+ */
+ private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
+ Config scopedConfig = fullConfig.subset(configPrefix);
+
+ Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
+ // Strip empty configs so they don't override the configs before them.
+ Map<String, String> mergedConfig = new HashMap<>();
+ for (Map<String, String> config : configPrecedence) {
+ for (Map.Entry<String, String> property : config.entrySet()) {
+ String value = property.getValue();
+ if (!(value == null || value.isEmpty())) {
+ mergedConfig.put(property.getKey(), property.getValue());
+ }
+ }
+ }
+ scopedConfig = new MapConfig(mergedConfig);
+ log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+
+ return scopedConfig;
+ }
+
+ static String createId(String jobName, String jobId) {
+ return String.format("%s-%s", jobName, jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
deleted file mode 100644
index 13755ae..0000000
--- a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.samza.config.Config;
-import org.apache.samza.system.StreamSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The ProcessorGraph is the physical execution graph for a multi-stage Samza application.
- * It contains the topology of execution processors connected with source/sink/intermediate streams.
- * High level APIs are transformed into ProcessorGraph for planning, validation and execution.
- * Source/sink streams are external streams while intermediate streams are created and managed by Samza.
- * Note that intermediate streams are both the input and output of a ProcessorNode in ProcessorGraph.
- * So the graph may have cycles and it's not a DAG.
- */
-public class ProcessorGraph {
- private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
-
- private final Map<String, ProcessorNode> nodes = new HashMap<>();
- private final Map<String, StreamEdge> edges = new HashMap<>();
- private final Set<StreamEdge> sources = new HashSet<>();
- private final Set<StreamEdge> sinks = new HashSet<>();
- private final Set<StreamEdge> intermediateStreams = new HashSet<>();
- private final Config config;
-
- /**
- * The ProcessorGraph is only constructed by the {@link ExecutionPlanner}.
- * @param config Config
- */
- /* package private */ ProcessorGraph(Config config) {
- this.config = config;
- }
-
- /**
- * Add a source stream to a {@link ProcessorNode}
- * @param input source stream
- * @param targetProcessorId id of the {@link ProcessorNode}
- */
- /* package private */ void addSource(StreamSpec input, String targetProcessorId) {
- ProcessorNode node = getOrCreateProcessor(targetProcessorId);
- StreamEdge edge = getOrCreateEdge(input);
- edge.addTargetNode(node);
- node.addInEdge(edge);
- sources.add(edge);
- }
-
- /**
- * Add a sink stream to a {@link ProcessorNode}
- * @param output sink stream
- * @param sourceProcessorId id of the {@link ProcessorNode}
- */
- /* package private */ void addSink(StreamSpec output, String sourceProcessorId) {
- ProcessorNode node = getOrCreateProcessor(sourceProcessorId);
- StreamEdge edge = getOrCreateEdge(output);
- edge.addSourceNode(node);
- node.addOutEdge(edge);
- sinks.add(edge);
- }
-
- /**
- * Add an intermediate stream from source to target {@link ProcessorNode}
- * @param streamSpec intermediate stream
- * @param sourceProcessorId id of the source {@link ProcessorNode}
- * @param targetProcessorId id of the target {@link ProcessorNode}
- */
- /* package private */ void addIntermediateStream(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
- ProcessorNode sourceNode = getOrCreateProcessor(sourceProcessorId);
- ProcessorNode targetNode = getOrCreateProcessor(targetProcessorId);
- StreamEdge edge = getOrCreateEdge(streamSpec);
- edge.addSourceNode(sourceNode);
- edge.addTargetNode(targetNode);
- sourceNode.addOutEdge(edge);
- targetNode.addInEdge(edge);
- intermediateStreams.add(edge);
- }
-
- /**
- * Get the {@link ProcessorNode} for an id. Create one if it does not exist.
- * @param processorId id of the processor
- * @return processor node
- */
- /* package private */ProcessorNode getOrCreateProcessor(String processorId) {
- ProcessorNode node = nodes.get(processorId);
- if (node == null) {
- node = new ProcessorNode(processorId, config);
- nodes.put(processorId, node);
- }
- return node;
- }
-
- /**
- * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
- * @param streamSpec spec of the StreamEdge
- * @return stream edge
- */
- /* package private */StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
- String streamId = streamSpec.getId();
- StreamEdge edge = edges.get(streamId);
- if (edge == null) {
- edge = new StreamEdge(streamSpec);
- edges.put(streamId, edge);
- }
- return edge;
- }
-
- /**
- * Returns the processors to be executed in the topological order
- * @return unmodifiable list of {@link ProcessorNode}
- */
- public List<ProcessorNode> getProcessorNodes() {
- List<ProcessorNode> sortedNodes = topologicalSort();
- return Collections.unmodifiableList(sortedNodes);
- }
-
- /**
- * Returns the source streams in the graph
- * @return unmodifiable set of {@link StreamEdge}
- */
- public Set<StreamEdge> getSources() {
- return Collections.unmodifiableSet(sources);
- }
-
- /**
- * Return the sink streams in the graph
- * @return unmodifiable set of {@link StreamEdge}
- */
- public Set<StreamEdge> getSinks() {
- return Collections.unmodifiableSet(sinks);
- }
-
- /**
- * Return the intermediate streams in the graph
- * @return unmodifiable set of {@link StreamEdge}
- */
- public Set<StreamEdge> getIntermediateStreams() {
- return Collections.unmodifiableSet(intermediateStreams);
- }
-
-
- /**
- * Validate the graph has the correct topology, meaning the sources are coming from external streams,
- * sinks are going to external streams, and the nodes are connected with intermediate streams.
- * Also validate all the nodes are reachable from the sources.
- */
- public void validate() {
- validateSources();
- validateSinks();
- validateInternalStreams();
- validateReachability();
- }
-
- /**
- * Validate the sources should have indegree being 0 and outdegree greater than 0
- */
- private void validateSources() {
- sources.forEach(edge -> {
- if (!edge.getSourceNodes().isEmpty()) {
- throw new IllegalArgumentException(
- String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
- }
- if (edge.getTargetNodes().isEmpty()) {
- throw new IllegalArgumentException(
- String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
- }
- });
- }
-
- /**
- * Validate the sinks should have outdegree being 0 and indegree greater than 0
- */
- private void validateSinks() {
- sinks.forEach(edge -> {
- if (!edge.getTargetNodes().isEmpty()) {
- throw new IllegalArgumentException(
- String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
- }
- if (edge.getSourceNodes().isEmpty()) {
- throw new IllegalArgumentException(
- String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
- }
- });
- }
-
- /**
- * Validate the internal streams should have both indegree and outdegree greater than 0
- */
- private void validateInternalStreams() {
- Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
- internalEdges.removeAll(sources);
- internalEdges.removeAll(sinks);
-
- internalEdges.forEach(edge -> {
- if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
- throw new IllegalArgumentException(
- String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
- }
- });
- }
-
- /**
- * Validate all nodes are reachable by sources.
- */
- private void validateReachability() {
- // validate all nodes are reachable from the sources
- final Set<ProcessorNode> reachable = findReachable();
- if (reachable.size() != nodes.size()) {
- Set<ProcessorNode> unreachable = new HashSet<>(nodes.values());
- unreachable.removeAll(reachable);
- throw new IllegalArgumentException(String.format("Processors %s cannot be reached from Sources.",
- String.join(", ", unreachable.stream().map(ProcessorNode::getId).collect(Collectors.toList()))));
- }
- }
-
- /**
- * Find the reachable set of nodes using BFS.
- * @return reachable set of {@link ProcessorNode}
- */
- /* package private */ Set<ProcessorNode> findReachable() {
- Queue<ProcessorNode> queue = new ArrayDeque<>();
- Set<ProcessorNode> visited = new HashSet<>();
-
- sources.forEach(source -> {
- List<ProcessorNode> next = source.getTargetNodes();
- queue.addAll(next);
- visited.addAll(next);
- });
-
- while (!queue.isEmpty()) {
- ProcessorNode node = queue.poll();
- node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
- if (!visited.contains(target)) {
- visited.add(target);
- queue.offer(target);
- }
- });
- }
-
- return visited;
- }
-
- /**
- * An variation of Kahn's algorithm of topological sorting.
- * This algorithm also takes account of the simple loops in the graph
- * @return topologically sorted {@link ProcessorNode}s
- */
- /* package private */ List<ProcessorNode> topologicalSort() {
- Collection<ProcessorNode> pnodes = nodes.values();
- if (pnodes.size() == 1) {
- return new ArrayList<>(pnodes);
- }
-
- Queue<ProcessorNode> q = new ArrayDeque<>();
- Map<String, Long> indegree = new HashMap<>();
- Set<ProcessorNode> visited = new HashSet<>();
- pnodes.forEach(node -> {
- String nid = node.getId();
- //only count the degrees of intermediate streams
- long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
- indegree.put(nid, degree);
-
- if (degree == 0L) {
- // start from the nodes that has no intermediate input streams, so it only consumes from sources
- q.add(node);
- visited.add(node);
- }
- });
-
- List<ProcessorNode> sortedNodes = new ArrayList<>();
- Set<ProcessorNode> reachable = new HashSet<>();
- while (sortedNodes.size() < pnodes.size()) {
- // Here we use indegree-based approach to implment Kahn's algorithm for topological sort
- // This approach will not change the graph itself during computation.
- //
- // The algorithm works as:
- // 1. start with nodes with no incoming edges (in degree being 0) and inserted into the list
- // 2. remove the edge from any node in the list to its connected nodes by changing the indegree of the connected nodes.
- // 3. add any new nodes with ingree being 0
- // 4. loop 1-3 until no more nodes with indegree 0
- //
- while (!q.isEmpty()) {
- ProcessorNode node = q.poll();
- sortedNodes.add(node);
- node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
- String nid = n.getId();
- Long degree = indegree.get(nid) - 1;
- indegree.put(nid, degree);
- if (degree == 0L && !visited.contains(n)) {
- q.add(n);
- visited.add(n);
- }
- reachable.add(n);
- });
- }
-
- if (sortedNodes.size() < pnodes.size()) {
- // The remaining nodes have cycles
- // use the following approach to break the cycles
- // start from the nodes that are reachable from previous traverse
- reachable.removeAll(sortedNodes);
- if (!reachable.isEmpty()) {
- //find out the nodes with minimal input edge
- long min = Long.MAX_VALUE;
- ProcessorNode minNode = null;
- for (ProcessorNode node : reachable) {
- Long degree = indegree.get(node.getId());
- if (degree < min) {
- min = degree;
- minNode = node;
- }
- }
- // start from the node with minimal input edge again
- q.add(minNode);
- visited.add(minNode);
- } else {
- // all the remaining nodes should be reachable from sources
- // start from sources again to find the next node that hasn't been visited
- ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
- .filter(node -> !visited.contains(node))
- .findAny().get();
- q.add(nextNode);
- visited.add(nextNode);
- }
- }
- }
-
- return sortedNodes;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java
deleted file mode 100644
index 3cb695f..0000000
--- a/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
- * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
- * A ProcessorNode contains the input/output, and the configs for physical execution.
- */
-public class ProcessorNode {
- private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
- private static final String CONFIG_PROCESSOR_PREFIX = "processors.%s.";
-
- private final String id;
- private final List<StreamEdge> inEdges = new ArrayList<>();
- private final List<StreamEdge> outEdges = new ArrayList<>();
- private final Config config;
-
- ProcessorNode(String id, Config config) {
- this.id = id;
- this.config = config;
- }
-
- public String getId() {
- return id;
- }
-
- void addInEdge(StreamEdge in) {
- inEdges.add(in);
- }
-
- void addOutEdge(StreamEdge out) {
- outEdges.add(out);
- }
-
- List<StreamEdge> getInEdges() {
- return inEdges;
- }
-
- List<StreamEdge> getOutEdges() {
- return outEdges;
- }
-
- public Config generateConfig() {
- Map<String, String> configs = new HashMap<>();
- configs.put(JobConfig.JOB_NAME(), id);
-
- List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
- configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
- log.info("Processor {} has generated configs {}", id, configs);
-
- String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
- // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
- return Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix));
- }
-
- /**
- * This function extract the subset of configs from the full config, and use it to override the generated configs
- * from the processor.
- * @param fullConfig full config
- * @param generatedConfig config generated from the processor
- * @param configPrefix prefix to extract the subset of the config overrides
- * @return config that merges the generated configs and overrides
- */
- private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
- Config scopedConfig = fullConfig.subset(configPrefix);
-
- Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
- // Strip empty configs so they don't override the configs before them.
- Map<String, String> mergedConfig = new HashMap<>();
- for (Map<String, String> config : configPrecedence) {
- for (Map.Entry<String, String> property : config.entrySet()) {
- String value = property.getValue();
- if (!(value == null || value.isEmpty())) {
- mergedConfig.put(property.getKey(), property.getValue());
- }
- }
- }
- scopedConfig = new MapConfig(mergedConfig);
- log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
-
- return scopedConfig;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 3215a22..9596d0f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -27,16 +27,16 @@ import org.apache.samza.util.Util;
/**
- * A StreamEdge connects the source {@link ProcessorNode}s to the target {@link ProcessorNode}s with a stream.
- * If it's a sink StreamEdge, the target ProcessorNode is empty.
- * If it's a source StreamEdge, the source ProcessorNode is empty.
+ * A StreamEdge connects the source {@link JobNode}s to the target {@link JobNode}s with a stream.
+ * If it's a sink StreamEdge, the target JobNode is empty.
+ * If it's a source StreamEdge, the source JobNode is empty.
*/
public class StreamEdge {
public static final int PARTITIONS_UNKNOWN = -1;
private final StreamSpec streamSpec;
- private final List<ProcessorNode> sourceNodes = new ArrayList<>();
- private final List<ProcessorNode> targetNodes = new ArrayList<>();
+ private final List<JobNode> sourceNodes = new ArrayList<>();
+ private final List<JobNode> targetNodes = new ArrayList<>();
private String name = "";
private int partitions = PARTITIONS_UNKNOWN;
@@ -46,11 +46,11 @@ public class StreamEdge {
this.name = Util.getNameFromSystemStream(getSystemStream());
}
- void addSourceNode(ProcessorNode sourceNode) {
+ void addSourceNode(JobNode sourceNode) {
sourceNodes.add(sourceNode);
}
- void addTargetNode(ProcessorNode targetNode) {
+ void addTargetNode(JobNode targetNode) {
targetNodes.add(targetNode);
}
@@ -70,11 +70,11 @@ public class StreamEdge {
return Util.getNameFromSystemStream(getSystemStream());
}
- List<ProcessorNode> getSourceNodes() {
+ List<JobNode> getSourceNodes() {
return sourceNodes;
}
- List<ProcessorNode> getTargetNodes() {
+ List<JobNode> getTargetNodes() {
return targetNodes;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 1444662..5a125a2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -53,6 +53,12 @@ public interface OperatorSpec<OM> {
MessageStreamImpl<OM> getNextStream();
/**
+ * Return the ID for this operator
+ * @return ID integer
+ */
+ int getOpId();
+
+ /**
* Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
*
* @param config the {@link Config} object for this task
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 9fe493e..cd6c492 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -26,8 +26,8 @@ import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.execution.ExecutionPlanner;
-import org.apache.samza.execution.ProcessorGraph;
-import org.apache.samza.execution.ProcessorNode;
+import org.apache.samza.execution.JobGraph;
+import org.apache.samza.execution.JobNode;
import org.apache.samza.execution.StreamManager;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.JobRunner;
@@ -60,19 +60,19 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
public void run(StreamApplication app) {
try {
// 1. initialize and plan
- ProcessorGraph processorGraph = getExecutionPlan(app);
+ JobGraph jobGraph = getExecutionPlan(app);
// 2. create the necessary streams
- List<StreamSpec> streams = processorGraph.getIntermediateStreams().stream()
+ List<StreamSpec> streams = jobGraph.getIntermediateStreams().stream()
.map(streamEdge -> streamEdge.getStreamSpec())
.collect(Collectors.toList());
streamManager.createStreams(streams);
// 3. submit jobs for remote execution
- processorGraph.getProcessorNodes().forEach(processor -> {
- Config processorConfig = processor.generateConfig();
- log.info("Starting processor {} with config {}", processor.getId(), processorConfig);
- JobRunner runner = new JobRunner(processorConfig);
+ jobGraph.getJobNodes().forEach(job -> {
+ Config jobConfig = job.generateConfig();
+ log.info("Starting job {} with config {}", job.getId(), jobConfig);
+ JobRunner runner = new JobRunner(jobConfig);
runner.run(true);
});
} catch (Throwable t) {
@@ -83,12 +83,12 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
@Override
public void kill(StreamApplication app) {
try {
- ProcessorGraph processorGraph = getExecutionPlan(app);
+ JobGraph jobGraph = getExecutionPlan(app);
- processorGraph.getProcessorNodes().forEach(processor -> {
- Config processorConfig = processor.generateConfig();
- log.info("Killing processor {}", processor.getId());
- JobRunner runner = new JobRunner(processorConfig);
+ jobGraph.getJobNodes().forEach(job -> {
+ Config jobConfig = job.generateConfig();
+ log.info("Killing job {}", job.getId());
+ JobRunner runner = new JobRunner(jobConfig);
runner.kill();
});
} catch (Throwable t) {
@@ -102,12 +102,12 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
boolean finished = false;
boolean unsuccessfulFinish = false;
- ProcessorGraph processorGraph = getExecutionPlan(app);
- for (ProcessorNode processor : processorGraph.getProcessorNodes()) {
- Config processorConfig = processor.generateConfig();
- JobRunner runner = new JobRunner(processorConfig);
+ JobGraph jobGraph = getExecutionPlan(app);
+ for (JobNode job : jobGraph.getJobNodes()) {
+ Config jobConfig = job.generateConfig();
+ JobRunner runner = new JobRunner(jobConfig);
ApplicationStatus status = runner.status();
- log.debug("Status is {} for processor {}", new Object[]{status, processor.getId()});
+ log.debug("Status is {} for jopb {}", new Object[]{status, job.getId()});
switch (status) {
case Running:
@@ -133,7 +133,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
}
}
- private ProcessorGraph getExecutionPlan(StreamApplication app) throws Exception {
+ private JobGraph getExecutionPlan(StreamApplication app) throws Exception {
// build stream graph
StreamGraph streamGraph = new StreamGraphImpl(this, config);
app.init(streamGraph, config);
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index dc828d9..e524ba1 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -227,26 +227,26 @@ public class TestExecutionPlanner {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
StreamGraph streamGraph = createStreamGraphWithJoin();
- ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
- assertTrue(processorGraph.getSources().size() == 3);
- assertTrue(processorGraph.getSinks().size() == 2);
- assertTrue(processorGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
+ JobGraph jobGraph = planner.createJobGraph(streamGraph);
+ assertTrue(jobGraph.getSources().size() == 3);
+ assertTrue(jobGraph.getSinks().size() == 2);
+ assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
}
@Test
public void testFetchExistingStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
StreamGraph streamGraph = createStreamGraphWithJoin();
- ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ JobGraph jobGraph = planner.createJobGraph(streamGraph);
- ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
- assertTrue(processorGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
- assertTrue(processorGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
- assertTrue(processorGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
- assertTrue(processorGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
- assertTrue(processorGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
+ ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+ assertTrue(jobGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
+ assertTrue(jobGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
+ assertTrue(jobGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
+ assertTrue(jobGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
+ assertTrue(jobGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
- processorGraph.getIntermediateStreams().forEach(edge -> {
+ jobGraph.getIntermediateStreams().forEach(edge -> {
assertTrue(edge.getPartitionCount() == -1);
});
}
@@ -255,13 +255,13 @@ public class TestExecutionPlanner {
public void testCalculateJoinInputPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
StreamGraph streamGraph = createStreamGraphWithJoin();
- ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ JobGraph jobGraph = planner.createJobGraph(streamGraph);
- ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
- ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
+ ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+ ExecutionPlanner.calculateJoinInputPartitions(streamGraph, jobGraph);
// the partitions should be the same as input1
- processorGraph.getIntermediateStreams().forEach(edge -> {
+ jobGraph.getIntermediateStreams().forEach(edge -> {
assertTrue(edge.getPartitionCount() == 64);
});
}
@@ -274,11 +274,11 @@ public class TestExecutionPlanner {
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraph streamGraph = createSimpleGraph();
- ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
- planner.calculatePartitions(streamGraph, processorGraph);
+ JobGraph jobGraph = planner.createJobGraph(streamGraph);
+ planner.calculatePartitions(streamGraph, jobGraph);
// the partitions should be the same as input1
- processorGraph.getIntermediateStreams().forEach(edge -> {
+ jobGraph.getIntermediateStreams().forEach(edge -> {
assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
});
}
@@ -287,11 +287,11 @@ public class TestExecutionPlanner {
public void testCalculateIntStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
StreamGraph streamGraph = createSimpleGraph();
- ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
- planner.calculatePartitions(streamGraph, processorGraph);
+ JobGraph jobGraph = planner.createJobGraph(streamGraph);
+ planner.calculatePartitions(streamGraph, jobGraph);
// the partitions should be the same as input1
- processorGraph.getIntermediateStreams().forEach(edge -> {
+ jobGraph.getIntermediateStreams().forEach(edge -> {
assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
new file mode 100644
index 0000000..d829b64
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobGraph {
+
+ JobGraph graph1;
+ JobGraph graph2;
+ JobGraph graph3;
+ JobGraph graph4;
+ int streamSeq = 0;
+
+ private StreamSpec genStream() {
+ ++streamSeq;
+
+ return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
+ }
+
+ /**
+ * graph1 is the example graph from wikipedia
+ *
+ * 5 7 3
+ * | / | / |
+ * v v |
+ * 11 8 |
+ * | \X /
+ * v v \v
+ * 2 9 10
+ */
+ private void createGraph1() {
+ graph1 = new JobGraph(null, null);
+
+ JobNode n2 = graph1.getOrCreateNode("2", "1");
+ JobNode n3 = graph1.getOrCreateNode("3", "1");
+ JobNode n5 = graph1.getOrCreateNode("5", "1");
+ JobNode n7 = graph1.getOrCreateNode("7", "1");
+ JobNode n8 = graph1.getOrCreateNode("8", "1");
+ JobNode n9 = graph1.getOrCreateNode("9", "1");
+ JobNode n10 = graph1.getOrCreateNode("10", "1");
+ JobNode n11 = graph1.getOrCreateNode("11", "1");
+
+ graph1.addSource(genStream(), n5);
+ graph1.addSource(genStream(), n7);
+ graph1.addSource(genStream(), n3);
+ graph1.addIntermediateStream(genStream(), n5, n11);
+ graph1.addIntermediateStream(genStream(), n7, n11);
+ graph1.addIntermediateStream(genStream(), n7, n8);
+ graph1.addIntermediateStream(genStream(), n3, n8);
+ graph1.addIntermediateStream(genStream(), n11, n2);
+ graph1.addIntermediateStream(genStream(), n11, n9);
+ graph1.addIntermediateStream(genStream(), n8, n9);
+ graph1.addIntermediateStream(genStream(), n11, n10);
+ graph1.addSink(genStream(), n2);
+ graph1.addSink(genStream(), n9);
+ graph1.addSink(genStream(), n10);
+ }
+
+ /**
+ * graph2 is a graph with a loop
+ * 1 -> 2 -> 3 -> 4 -> 5 -> 7
+ * |<---6 <--| <>
+ */
+ private void createGraph2() {
+ graph2 = new JobGraph(null, null);
+
+ JobNode n1 = graph2.getOrCreateNode("1", "1");
+ JobNode n2 = graph2.getOrCreateNode("2", "1");
+ JobNode n3 = graph2.getOrCreateNode("3", "1");
+ JobNode n4 = graph2.getOrCreateNode("4", "1");
+ JobNode n5 = graph2.getOrCreateNode("5", "1");
+ JobNode n6 = graph2.getOrCreateNode("6", "1");
+ JobNode n7 = graph2.getOrCreateNode("7", "1");
+
+ graph2.addSource(genStream(), n1);
+ graph2.addIntermediateStream(genStream(), n1, n2);
+ graph2.addIntermediateStream(genStream(), n2, n3);
+ graph2.addIntermediateStream(genStream(), n3, n4);
+ graph2.addIntermediateStream(genStream(), n4, n5);
+ graph2.addIntermediateStream(genStream(), n4, n6);
+ graph2.addIntermediateStream(genStream(), n6, n2);
+ graph2.addIntermediateStream(genStream(), n5, n5);
+ graph2.addIntermediateStream(genStream(), n5, n7);
+ graph2.addSink(genStream(), n7);
+ }
+
+ /**
+ * graph3 is a graph with two self loops
+ * 1<->1 -> 2<->2
+ */
+ private void createGraph3() {
+ graph3 = new JobGraph(null, null);
+
+ JobNode n1 = graph3.getOrCreateNode("1", "1");
+ JobNode n2 = graph3.getOrCreateNode("2", "1");
+
+ graph3.addSource(genStream(), n1);
+ graph3.addIntermediateStream(genStream(), n1, n1);
+ graph3.addIntermediateStream(genStream(), n1, n2);
+ graph3.addIntermediateStream(genStream(), n2, n2);
+ }
+
+ /**
+ * graph4 is a graph of single-loop node
+ * 1<->1
+ */
+ private void createGraph4() {
+ graph4 = new JobGraph(null, null);
+
+ JobNode n1 = graph4.getOrCreateNode("1", "1");
+
+ graph4.addSource(genStream(), n1);
+ graph4.addIntermediateStream(genStream(), n1, n1);
+ }
+
+ @Before
+ public void setup() {
+ createGraph1();
+ createGraph2();
+ createGraph3();
+ createGraph4();
+ }
+
+ @Test
+ public void testAddSource() {
+ JobGraph graph = new JobGraph(null, null);
+
+ /**
+ * s1 -> 1
+ * s2 ->|
+ *
+ * s3 -> 2
+ * |-> 3
+ */
+ JobNode n1 = graph.getOrCreateNode("1", "1");
+ JobNode n2 = graph.getOrCreateNode("2", "1");
+ JobNode n3 = graph.getOrCreateNode("3", "1");
+ StreamSpec s1 = genStream();
+ StreamSpec s2 = genStream();
+ StreamSpec s3 = genStream();
+ graph.addSource(s1, n1);
+ graph.addSource(s2, n1);
+ graph.addSource(s3, n2);
+ graph.addSource(s3, n3);
+
+ assertTrue(graph.getSources().size() == 3);
+
+ assertTrue(graph.getOrCreateNode("1", "1").getInEdges().size() == 2);
+ assertTrue(graph.getOrCreateNode("2", "1").getInEdges().size() == 1);
+ assertTrue(graph.getOrCreateNode("3", "1").getInEdges().size() == 1);
+
+ assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
+ assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
+ assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
+ assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
+ assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
+ assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
+ }
+
+ @Test
+ public void testAddSink() {
+ /**
+ * 1 -> s1
+ * 2 -> s2
+ * 2 -> s3
+ */
+ JobGraph graph = new JobGraph(null, null);
+ JobNode n1 = graph.getOrCreateNode("1", "1");
+ JobNode n2 = graph.getOrCreateNode("2", "1");
+ StreamSpec s1 = genStream();
+ StreamSpec s2 = genStream();
+ StreamSpec s3 = genStream();
+ graph.addSink(s1, n1);
+ graph.addSink(s2, n2);
+ graph.addSink(s3, n2);
+
+ assertTrue(graph.getSinks().size() == 3);
+ assertTrue(graph.getOrCreateNode("1", "1").getOutEdges().size() == 1);
+ assertTrue(graph.getOrCreateNode("2", "1").getOutEdges().size() == 2);
+
+ assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
+ assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
+ assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
+ assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
+ assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
+ assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
+ }
+
+ @Test
+ public void testReachable() {
+ Set<JobNode> reachable1 = graph1.findReachable();
+ assertTrue(reachable1.size() == 8);
+
+ Set<JobNode> reachable2 = graph2.findReachable();
+ assertTrue(reachable2.size() == 7);
+ }
+
+ @Test
+ public void testTopologicalSort() {
+
+ // test graph1
+ List<JobNode> sortedNodes1 = graph1.topologicalSort();
+ Map<String, Integer> idxMap1 = new HashMap<>();
+ for (int i = 0; i < sortedNodes1.size(); i++) {
+ idxMap1.put(sortedNodes1.get(i).getJobName(), i);
+ }
+
+ assertTrue(idxMap1.size() == 8);
+ assertTrue(idxMap1.get("11") > idxMap1.get("5"));
+ assertTrue(idxMap1.get("11") > idxMap1.get("7"));
+ assertTrue(idxMap1.get("8") > idxMap1.get("7"));
+ assertTrue(idxMap1.get("8") > idxMap1.get("3"));
+ assertTrue(idxMap1.get("2") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("9") > idxMap1.get("8"));
+ assertTrue(idxMap1.get("9") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("10") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("10") > idxMap1.get("3"));
+
+ // test graph2
+ List<JobNode> sortedNodes2 = graph2.topologicalSort();
+ Map<String, Integer> idxMap2 = new HashMap<>();
+ for (int i = 0; i < sortedNodes2.size(); i++) {
+ idxMap2.put(sortedNodes2.get(i).getJobName(), i);
+ }
+
+ assertTrue(idxMap2.size() == 7);
+ assertTrue(idxMap2.get("2") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("3") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("4") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("6") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("5") > idxMap2.get("4"));
+ assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+
+ //test graph3
+ List<JobNode> sortedNodes3 = graph3.topologicalSort();
+ assertTrue(sortedNodes3.size() == 2);
+ assertEquals(sortedNodes3.get(0).getJobName(), "1");
+ assertEquals(sortedNodes3.get(1).getJobName(), "2");
+
+ //test graph4
+ List<JobNode> sortedNodes4 = graph4.topologicalSort();
+ assertTrue(sortedNodes4.size() == 1);
+ assertEquals(sortedNodes4.get(0).getJobName(), "1");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
deleted file mode 100644
index 2f89d91..0000000
--- a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestProcessorGraph {
-
- ProcessorGraph graph1;
- ProcessorGraph graph2;
- ProcessorGraph graph3;
- ProcessorGraph graph4;
- int streamSeq = 0;
-
- private StreamSpec genStream() {
- ++streamSeq;
-
- return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
- }
-
- @Before
- public void setup() {
- /**
- * graph1 is the example graph from wikipedia
- *
- * 5 7 3
- * | / | / |
- * v v |
- * 11 8 |
- * | \X /
- * v v \v
- * 2 9 10
- */
- // init graph1
- graph1 = new ProcessorGraph(null);
- graph1.addSource(genStream(), "5");
- graph1.addSource(genStream(), "7");
- graph1.addSource(genStream(), "3");
- graph1.addIntermediateStream(genStream(), "5", "11");
- graph1.addIntermediateStream(genStream(), "7", "11");
- graph1.addIntermediateStream(genStream(), "7", "8");
- graph1.addIntermediateStream(genStream(), "3", "8");
- graph1.addIntermediateStream(genStream(), "11", "2");
- graph1.addIntermediateStream(genStream(), "11", "9");
- graph1.addIntermediateStream(genStream(), "8", "9");
- graph1.addIntermediateStream(genStream(), "11", "10");
- graph1.addSink(genStream(), "2");
- graph1.addSink(genStream(), "9");
- graph1.addSink(genStream(), "10");
-
- /**
- * graph2 is a graph with a loop
- * 1 -> 2 -> 3 -> 4 -> 5 -> 7
- * |<---6 <--| <>
- */
- graph2 = new ProcessorGraph(null);
- graph2.addSource(genStream(), "1");
- graph2.addIntermediateStream(genStream(), "1", "2");
- graph2.addIntermediateStream(genStream(), "2", "3");
- graph2.addIntermediateStream(genStream(), "3", "4");
- graph2.addIntermediateStream(genStream(), "4", "5");
- graph2.addIntermediateStream(genStream(), "4", "6");
- graph2.addIntermediateStream(genStream(), "6", "2");
- graph2.addIntermediateStream(genStream(), "5", "5");
- graph2.addIntermediateStream(genStream(), "5", "7");
- graph2.addSink(genStream(), "7");
-
- /**
- * graph3 is a graph with self loops
- * 1<->1 -> 2<->2
- */
- graph3 = new ProcessorGraph(null);
- graph3.addSource(genStream(), "1");
- graph3.addIntermediateStream(genStream(), "1", "1");
- graph3.addIntermediateStream(genStream(), "1", "2");
- graph3.addIntermediateStream(genStream(), "2", "2");
-
- /**
- * graph4 is a graph of single-loop node
- * 1<->1
- */
- graph4 = new ProcessorGraph(null);
- graph4.addSource(genStream(), "1");
- graph4.addIntermediateStream(genStream(), "1", "1");
- }
-
- @Test
- public void testAddSource() {
- ProcessorGraph graph = new ProcessorGraph(null);
-
- /**
- * s1 -> 1
- * s2 ->|
- *
- * s3 -> 2
- * |-> 3
- */
- StreamSpec s1 = genStream();
- StreamSpec s2 = genStream();
- StreamSpec s3 = genStream();
- graph.addSource(s1, "1");
- graph.addSource(s2, "1");
- graph.addSource(s3, "2");
- graph.addSource(s3, "3");
-
- assertTrue(graph.getSources().size() == 3);
-
- assertTrue(graph.getOrCreateProcessor("1").getInEdges().size() == 2);
- assertTrue(graph.getOrCreateProcessor("2").getInEdges().size() == 1);
- assertTrue(graph.getOrCreateProcessor("3").getInEdges().size() == 1);
-
- assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
- }
-
- @Test
- public void testAddSink() {
- /**
- * 1 -> s1
- * 2 -> s2
- * 2 -> s3
- */
- StreamSpec s1 = genStream();
- StreamSpec s2 = genStream();
- StreamSpec s3 = genStream();
- ProcessorGraph graph = new ProcessorGraph(null);
- graph.addSink(s1, "1");
- graph.addSink(s2, "2");
- graph.addSink(s3, "2");
-
- assertTrue(graph.getSinks().size() == 3);
- assertTrue(graph.getOrCreateProcessor("1").getOutEdges().size() == 1);
- assertTrue(graph.getOrCreateProcessor("2").getOutEdges().size() == 2);
-
- assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
- }
-
- @Test
- public void testReachable() {
- Set<ProcessorNode> reachable1 = graph1.findReachable();
- assertTrue(reachable1.size() == 8);
-
- Set<ProcessorNode> reachable2 = graph2.findReachable();
- assertTrue(reachable2.size() == 7);
- }
-
- @Test
- public void testTopologicalSort() {
-
- // test graph1
- List<ProcessorNode> sortedNodes1 = graph1.topologicalSort();
- Map<String, Integer> idxMap1 = new HashMap<>();
- for (int i = 0; i < sortedNodes1.size(); i++) {
- idxMap1.put(sortedNodes1.get(i).getId(), i);
- }
-
- assertTrue(idxMap1.size() == 8);
- assertTrue(idxMap1.get("11") > idxMap1.get("5"));
- assertTrue(idxMap1.get("11") > idxMap1.get("7"));
- assertTrue(idxMap1.get("8") > idxMap1.get("7"));
- assertTrue(idxMap1.get("8") > idxMap1.get("3"));
- assertTrue(idxMap1.get("2") > idxMap1.get("11"));
- assertTrue(idxMap1.get("9") > idxMap1.get("8"));
- assertTrue(idxMap1.get("9") > idxMap1.get("11"));
- assertTrue(idxMap1.get("10") > idxMap1.get("11"));
- assertTrue(idxMap1.get("10") > idxMap1.get("3"));
-
- // test graph2
- List<ProcessorNode> sortedNodes2 = graph2.topologicalSort();
- Map<String, Integer> idxMap2 = new HashMap<>();
- for (int i = 0; i < sortedNodes2.size(); i++) {
- idxMap2.put(sortedNodes2.get(i).getId(), i);
- }
-
- assertTrue(idxMap2.size() == 7);
- assertTrue(idxMap2.get("2") > idxMap2.get("1"));
- assertTrue(idxMap2.get("3") > idxMap2.get("1"));
- assertTrue(idxMap2.get("4") > idxMap2.get("1"));
- assertTrue(idxMap2.get("6") > idxMap2.get("1"));
- assertTrue(idxMap2.get("5") > idxMap2.get("4"));
- assertTrue(idxMap2.get("7") > idxMap2.get("5"));
-
- //test graph3
- List<ProcessorNode> sortedNodes3 = graph3.topologicalSort();
- assertTrue(sortedNodes3.size() == 2);
- assertEquals(sortedNodes3.get(0).getId(), "1");
- assertEquals(sortedNodes3.get(1).getId(), "2");
-
- //test graph4
- List<ProcessorNode> sortedNodes4 = graph4.topologicalSort();
- assertTrue(sortedNodes4.size() == 1);
- assertEquals(sortedNodes4.get(0).getId(), "1");
- }
-}