You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/03 18:03:45 UTC
samza git commit: Support topic partition generation of partitionBy
and join
Repository: samza
Updated Branches:
refs/heads/samza-fluent-api-v1 86eb10f2f -> a83c69a25
Support topic partition generation of partitionBy and join
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a83c69a2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a83c69a2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a83c69a2
Branch: refs/heads/samza-fluent-api-v1
Commit: a83c69a25d3b1fa67cac2a9113c57e52d86d4f1d
Parents: 86eb10f
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Thu Mar 2 18:42:23 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Fri Mar 3 10:03:30 2017 -0800
----------------------------------------------------------------------
.../samza/operators/MessageStreamImpl.java | 6 +-
.../apache/samza/operators/StreamGraphImpl.java | 30 +-
.../samza/operators/spec/OperatorSpecs.java | 4 +-
.../samza/processorgraph/ExecutionPlanner.java | 213 ++++++++++----
.../samza/processorgraph/ProcessorGraph.java | 61 ++--
.../samza/processorgraph/ProcessorNode.java | 36 +--
.../apache/samza/processorgraph/StreamEdge.java | 15 -
.../system/AbstractExecutionEnvironment.java | 5 +-
.../system/RemoteExecutionEnvironment.java | 2 +-
.../system/StandaloneExecutionEnvironment.java | 2 +-
.../apache/samza/task/StreamOperatorTask.java | 6 +-
.../apache/samza/util/ConfigInheritence.java | 6 +-
.../org/apache/samza/config/JobConfig.scala | 1 +
.../processorgraph/TestExecutionPlanner.java | 281 +++++++++++++++++++
14 files changed, 525 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 830e4a5..77cb3fc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -163,10 +163,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
- MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor);
+ int opId = graph.getNextOpId();
+ String streamId = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name(), opId);
+ MessageStreamImpl<M> intStream = this.graph.createIntStream(streamId, parKeyExtractor);
OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
- this.graph, outputStream));
+ this.graph, outputStream, opId));
return intStream;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index b965d6a..f3fd176 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,20 +18,20 @@
*/
package org.apache.samza.operators;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.Function;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.ExecutionEnvironment;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
* create system input/output/intermediate streams.
@@ -129,9 +129,14 @@ public class StreamGraphImpl implements StreamGraph {
*/
private final Map<String, MessageStream> inStreams = new HashMap<>();
private final Map<String, OutputStream> outStreams = new HashMap<>();
+ private final ExecutionEnvironment executionEnvironment;
private ContextManager contextManager = new ContextManager() { };
+ public StreamGraphImpl(ExecutionEnvironment executionEnvironment) {
+ this.executionEnvironment = executionEnvironment;
+ }
+
@Override
public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
if (!this.inStreams.containsKey(streamSpec.getId())) {
@@ -182,7 +187,12 @@ public class StreamGraphImpl implements StreamGraph {
@Override public Map<StreamSpec, OutputStream> getOutStreams() {
Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
- this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
+ this.outStreams.forEach((ss, entry) -> {
+ StreamSpec streamSpec = (entry instanceof IntermediateStreamImpl) ?
+ ((IntermediateStreamImpl) entry).getSpec() :
+ ((OutputStreamImpl) entry).getSpec();
+ outStreamMap.put(streamSpec, entry);
+ });
return Collections.unmodifiableMap(outStreamMap);
}
@@ -231,9 +241,9 @@ public class StreamGraphImpl implements StreamGraph {
* @param <M> the type of input message
* @return the {@link OutputStream} object for the re-partitioned stream
*/
- <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
+ <PK, M> MessageStreamImpl<M> createIntStream(String streamId, Function<M, PK> parKeyFn) {
// TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
- StreamSpec streamSpec = this.createIntStreamSpec();
+ StreamSpec streamSpec = executionEnvironment.streamFromConfig(streamId);
if (!this.inStreams.containsKey(streamSpec.getId())) {
this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
@@ -244,10 +254,4 @@ public class StreamGraphImpl implements StreamGraph {
}
return intStream;
}
-
- private StreamSpec createIntStreamSpec() {
- // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index d626852..7b14b9c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -152,8 +152,8 @@ public class OperatorSpecs {
* @param <M> type of input message
* @return the {@link SinkOperatorSpec}
*/
- public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
+ public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream, int opId) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
index ddf2ca7..757034e 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -24,15 +24,20 @@ import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
@@ -43,6 +48,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
+ * the intermediate topics needed for the execution.
+ */
public class ExecutionPlanner {
private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
@@ -56,27 +65,28 @@ public class ExecutionPlanner {
Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);
// create physical processors based on stream graph
- ProcessorGraph processorGraph = splitStages(streamGraph);
+ ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
if (!processorGraph.getInternalStreams().isEmpty()) {
- // figure out the partition for internal streams
- Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
+ // figure out the partitions for internal streams
+ calculatePartitions(streamGraph, processorGraph, sysAdmins);
// create the streams
- createStreams(streams, sysAdmins);
+ createStreams(processorGraph, sysAdmins);
}
return processorGraph;
}
- public ProcessorGraph splitStages(StreamGraph streamGraph) throws Exception {
- String pipelineId = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.getOrDefault(JobConfig.JOB_ID(), "1"));
- // For this phase, we are going to create a processor with the whole dag
- String processorId = pipelineId; // only one processor, name it the same as pipeline itself
+ /**
+ * Create the physical graph from StreamGraph
+ * Package private for testing
+ */
+ ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
+ // For this phase, we are going to create a processor for the whole dag
+ String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
ProcessorGraph processorGraph = new ProcessorGraph(config);
-
- // TODO: remote the casting once we have the correct types in StreamGraph
Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
@@ -98,49 +108,32 @@ public class ExecutionPlanner {
return processorGraph;
}
- private Multimap<String, StreamSpec> calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ /**
+ * Figure out the number of partitions of intermediate streams
+ * Package private for testing
+ */
+ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
// fetch the external streams partition info
- getExistingStreamPartitions(processorGraph, sysAdmins);
-
- // use BFS to figure out the join partition count
-
-
- // TODO this algorithm assumes only one processor, and it does not consider join
- Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
- List<ProcessorNode> processors = processorGraph.topologicalSort();
- processors.forEach(processor -> {
- Set<StreamEdge> outStreams = new HashSet<>(processor.getOutEdges());
- outStreams.retainAll(processorGraph.getInternalStreams());
- if (!outStreams.isEmpty()) {
- int maxInPartition = maxPartition(processor.getInEdges());
- int maxOutPartition = maxPartition(processor.getOutEdges());
- int partition = Math.max(maxInPartition, maxOutPartition);
-
- outStreams.forEach(streamEdge -> {
- if (streamEdge.getPartitions() == -1) {
- streamEdge.setPartitions(partition);
- StreamSpec streamSpec = createStreamSpec(streamEdge);
- streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
- }
- });
- }
- });
+ fetchExistingStreamPartitions(processorGraph, sysAdmins);
- return streamsGroupedBySystem;
+ // calculate the partitions for the input streams of join operators
+ calculateJoinInputPartitions(streamGraph, processorGraph);
+
+ // calculate the partitions for the rest of intermediate streams
+ calculateIntStreamPartitions(processorGraph, config);
}
- private void getExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
- Set<StreamEdge> allStreams = new HashSet<>();
- allStreams.addAll(processorGraph.getSources());
- allStreams.addAll(processorGraph.getSinks());
- allStreams.addAll(processorGraph.getInternalStreams());
+ static void fetchExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ Set<StreamEdge> existingStreams = new HashSet<>();
+ existingStreams.addAll(processorGraph.getSources());
+ existingStreams.addAll(processorGraph.getSinks());
- Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create();
- allStreams.forEach(streamEdge -> {
+ Multimap<String, StreamEdge> existingStreamsMap = HashMultimap.create();
+ existingStreams.forEach(streamEdge -> {
SystemStream systemStream = streamEdge.getSystemStream();
- externalStreamsMap.put(systemStream.getSystem(), streamEdge);
+ existingStreamsMap.put(systemStream.getSystem(), streamEdge);
});
- for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) {
+ for (Map.Entry<String, Collection<StreamEdge>> entry : existingStreamsMap.asMap().entrySet()) {
String systemName = entry.getKey();
Collection<StreamEdge> streamEdges = entry.getValue();
Map<String, StreamEdge> streamToEdge = new HashMap<>();
@@ -150,18 +143,136 @@ public class ExecutionPlanner {
metadata.forEach((stream, data) -> {
int partitions = data.getSystemStreamPartitionMetadata().size();
streamToEdge.get(stream).setPartitions(partitions);
- log.info("Partition count is {} for stream {}", partitions, stream);
+ log.debug("Partition count is {} for stream {}", partitions, stream);
});
}
}
- private void createStreams(Multimap<String, StreamSpec> streams, Map<String, SystemAdmin> sysAdmins) {
- for (Map.Entry<String, Collection<StreamSpec>> entry : streams.asMap().entrySet()) {
+ /**
+ * Calculate the partitions for the input streams of join operators
+ * Package private for testing
+ */
+ static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+ // get join operators with input streams
+ Multimap<OperatorSpec, StreamEdge> joinToStreamMap = HashMultimap.create();
+ Multimap<StreamEdge, OperatorSpec> streamToJoinMap = HashMultimap.create();
+ Map<MessageStream, OperatorSpec> outputToJoinMap = new HashMap<>();
+ Queue<OperatorSpec> joinQ = new LinkedList<>(); // a queue of joins with known input partitions
+ Set<OperatorSpec> visited = new HashSet<>();
+ streamGraph.getInStreams().entrySet().forEach(entry -> {
+ StreamEdge streamEdge = processorGraph.getEdge(entry.getKey());
+ getJoins(entry.getValue(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+ });
+ // calculate join input partition count
+ while (!joinQ.isEmpty()) {
+ OperatorSpec join = joinQ.poll();
+ int partitions = -1;
+ // loop through the input streams to the join and find the partition count
+ for (StreamEdge edge : joinToStreamMap.get(join)) {
+ int edgePartitions = edge.getPartitions();
+ if (edgePartitions != -1) {
+ if (partitions == -1) {
+ //if the partition is not assigned
+ partitions = edgePartitions;
+ } else if (partitions != edgePartitions) {
+ throw new SamzaException(String.format("Unable to resolve input partitions of stream %s for join",
+ edge.getSystemStream().toString()));
+ }
+ }
+ }
+ // assign the partition count
+ for (StreamEdge edge : joinToStreamMap.get(join)) {
+ if (edge.getPartitions() <= 0) {
+ edge.setPartitions(partitions);
+
+ // find other joins can be inferred by setting this edge
+ for (OperatorSpec op : streamToJoinMap.get(edge)) {
+ if (!visited.contains(op)) {
+ joinQ.add(op);
+ visited.add(op);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This function
+ * @param messageStream
+ * @param streamEdge
+ * @param joinToStreamMap
+ * @param streamToJoinMap
+ * @param outputToJoinMap
+ * @param joinQ
+ * @param visited
+ */
+ static void getJoins(MessageStream messageStream,
+ StreamEdge streamEdge,
+ Multimap<OperatorSpec, StreamEdge> joinToStreamMap,
+ Multimap<StreamEdge, OperatorSpec> streamToJoinMap,
+ Map<MessageStream, OperatorSpec> outputToJoinMap,
+ Queue<OperatorSpec> joinQ,
+ Set<OperatorSpec> visited) {
+ Collection<OperatorSpec> specs = ((MessageStreamImpl) messageStream).getRegisteredOperatorSpecs();
+ for (OperatorSpec spec : specs) {
+ if (spec instanceof PartialJoinOperatorSpec) {
+ // every join will have two partial join operators
+ // we will choose one of them in order to consolidate the inputs
+ // the first one who registered with the outputToJoinMap will win
+ MessageStream output = spec.getNextStream();
+ OperatorSpec joinSpec = outputToJoinMap.get(output);
+ if (joinSpec == null) {
+ joinSpec = spec;
+ outputToJoinMap.put(output, joinSpec);
+ }
+
+ joinToStreamMap.put(joinSpec, streamEdge);
+ streamToJoinMap.put(streamEdge, joinSpec);
+
+ if (!visited.contains(joinSpec) && streamEdge.getPartitions() > 0) {
+ // put the joins with known input partitions into the queue
+ joinQ.add(joinSpec);
+ visited.add(joinSpec);
+ }
+ }
+
+ if (spec.getNextStream() != null) {
+ getJoins(spec.getNextStream(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+ }
+ }
+ }
+
+ static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) {
+ int partitions = config.getInt(JobConfig.JOB_DEFAULT_PARTITIONS(), -1);
+ if (partitions < 0) {
+ // use the following simple algo to figure out the partitions
+ // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
+ int maxInPartitions = maxPartition(processorGraph.getSources());
+ int maxOutPartitions = maxPartition(processorGraph.getSinks());
+ partitions = Math.max(maxInPartitions, maxOutPartitions);
+ }
+ for (StreamEdge edge : processorGraph.getInternalStreams()) {
+ if (edge.getPartitions() <= 0) {
+ edge.setPartitions(partitions);
+ }
+ }
+ }
+
+ private static void createStreams(ProcessorGraph graph, Map<String, SystemAdmin> sysAdmins) {
+ Multimap<String, StreamSpec> streamsToCreate = HashMultimap.create();
+ graph.getInternalStreams().forEach(edge -> {
+ StreamSpec streamSpec = createStreamSpec(edge);
+ streamsToCreate.put(edge.getSystemStream().getSystem(), streamSpec);
+ });
+
+ for (Map.Entry<String, Collection<StreamSpec>> entry : streamsToCreate.asMap().entrySet()) {
String systemName = entry.getKey();
SystemAdmin systemAdmin = sysAdmins.get(systemName);
for (StreamSpec stream : entry.getValue()) {
- log.info("Creating stream {} on system {}", stream.getPhysicalName(), systemName);
+ log.info("Creating stream {} with partitions {} on system {}",
+ new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
systemAdmin.createStream(stream);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
index d4ad84b..5ee4d29 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
@@ -37,12 +37,9 @@ import org.slf4j.LoggerFactory;
/**
- * The ProcessorGraph represents the multi-stage Samza processors of a pipeline on the physical execution layer.
- * High level APIs are transformed into ProcessorGraph for future plan, validation and execution.
- *
- * <p>The ProcessorGraph is a graph of source/sink/intermediate streams and processors are connected together. Each
- * ProcessorNode contains the config which is required to run the processor.
- *
+ * The ProcessorGraph is the physical execution graph for a multi-stage Samza application.
+ * It contains the topology of execution processors connected by source/sink/intermediate streams.
+ * High level APIs are transformed into ProcessorGraph for planing, validation and execution.
*/
public class ProcessorGraph {
private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
@@ -64,8 +61,6 @@ public class ProcessorGraph {
edge.addTargetNode(node);
node.addInEdge(edge);
sources.add(edge);
-
- log.info(edge.toString());
}
void addSink(StreamSpec output, String sourceProcessorId) {
@@ -74,8 +69,6 @@ public class ProcessorGraph {
edge.addSourceNode(node);
node.addOutEdge(edge);
sinks.add(edge);
-
- log.info(edge.toString());
}
void addEdge(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
@@ -87,8 +80,6 @@ public class ProcessorGraph {
sourceNode.addOutEdge(edge);
targetNode.addInEdge(edge);
internalStreams.add(edge);
-
- log.info(edge.toString());
}
ProcessorNode getNode(String processorId) {
@@ -242,18 +233,22 @@ public class ProcessorGraph {
Collection<ProcessorNode> pnodes = nodes.values();
Queue<ProcessorNode> q = new ArrayDeque<>();
Map<String, Long> indegree = new HashMap<>();
+ Set<ProcessorNode> visited = new HashSet<>();
pnodes.forEach(node -> {
String nid = node.getId();
+ //only count the degrees of intermediate streams since sources have degree 0
long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
indegree.put(nid, degree);
if (degree == 0L) {
+ // start from the nodes that only consume from sources
q.add(node);
+ visited.add(node);
}
});
List<ProcessorNode> sortedNodes = new ArrayList<>();
- Set<ProcessorNode> visited = new HashSet<>();
+ Set<ProcessorNode> reachable = new HashSet<>();
while (sortedNodes.size() < pnodes.size()) {
while (!q.isEmpty()) {
ProcessorNode node = q.poll();
@@ -264,31 +259,41 @@ public class ProcessorGraph {
indegree.put(nid, degree);
if (degree == 0L && !visited.contains(n)) {
q.add(n);
+ visited.add(n);
}
- visited.add(n);
+ reachable.add(n);
});
}
if (sortedNodes.size() < pnodes.size()) {
// The remaining nodes have circles
- // use the following simple approach to break the circles
- // start from the node that have been seen
- visited.removeAll(sortedNodes);
- //find out the nodes with minimal input edge
- long min = Long.MAX_VALUE;
- ProcessorNode minNode = null;
- for (ProcessorNode node : visited) {
- Long degree = indegree.get(node.getId());
- if (degree < min) {
- min = degree;
- minNode = node;
+ // use the following approach to break the circles
+ // start from the nodes that are reachable from previous traverse
+ reachable.removeAll(sortedNodes);
+ if (!reachable.isEmpty()) {
+ //find out the nodes with minimal input edge
+ long min = Long.MAX_VALUE;
+ ProcessorNode minNode = null;
+ for (ProcessorNode node : reachable) {
+ Long degree = indegree.get(node.getId());
+ if (degree < min) {
+ min = degree;
+ minNode = node;
+ }
}
+ // start from the node with minimal input edge again
+ q.add(minNode);
+ } else {
+ // all the remaining nodes should be reachable from sources
+ // start from sources again to find the next node that hasn't been visited
+ ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+ .filter(node -> !visited.contains(node))
+ .findAny().get();
+ q.add(nextNode);
}
- // start from the node with minimal input edge again
- q.add(minNode);
}
}
return sortedNodes;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
index 0b02377..08d94b4 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.util.ConfigInheritence;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
@@ -35,8 +36,9 @@ import org.slf4j.LoggerFactory;
/**
- * The ProcessorNode represents a Samza processor.
- * It contains the input/output, and the config to run the processor.
+ * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
+ * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
+ * A ProcessorNode contains the input/output, and the configs for physical execution.
*/
public class ProcessorNode {
private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
@@ -73,31 +75,15 @@ public class ProcessorNode {
}
public Config generateConfig() {
- String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
- // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
- return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, generateProcessorConfig(), configPrefix));
- }
-
- private Config generateProcessorConfig() {
Map<String, String> configs = new HashMap<>();
- List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
-
- // TODO temp logs for debugging
- log.info("Processor {} has formatted inputs {}", id, inputs);
-
- // TODO hack alert: hard coded string literals!
- configs.put("task.inputs", Joiner.on(',').join(inputs));
-
- // TODO: DISCUSS how does the processor know it's output names?
- outEdges.forEach(edge -> {
- if (!edge.getName().isEmpty()) {
- configs.put(String.format("task.outputs.%s.stream", edge.getName()), edge.getFormattedSystemStream());
- }
- });
-
configs.put(JobConfig.JOB_NAME(), id);
+ List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+ configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
log.info("Processor {} has generated configs {}", id, configs);
- return new MapConfig(configs);
+
+ String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
+ // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
+ return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, new MapConfig(configs), configPrefix));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
index 879d705..664a458 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
@@ -19,10 +19,8 @@
package org.apache.samza.processorgraph;
-import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
@@ -92,17 +90,4 @@ public class StreamEdge {
void setName(String name) {
this.name = name;
}
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder("StreamEdge ");
- builder.append(getSystemStream().toString()).append(": (");
- List<String> sourceIds = sourceNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
- String sources = Joiner.on(',').join(sourceIds);
- builder.append(sources).append(") -> (");
- List<String> targetIds = targetNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
- String targets = Joiner.on(',').join(targetIds);
- builder.append(targets).append(")");
- return builder.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
index c066bdd..dabe651 100644
--- a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
@@ -20,12 +20,14 @@ package org.apache.samza.system;
import java.util.Map;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StreamConfig;
public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {
private final Config config;
+ private final String streamPrefix;
public AbstractExecutionEnvironment(Config config) {
if (config == null) {
@@ -33,6 +35,7 @@ public abstract class AbstractExecutionEnvironment implements ExecutionEnvironme
}
this.config = config;
+ this.streamPrefix = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.get(JobConfig.JOB_ID(), "1"));
}
@Override
@@ -40,7 +43,7 @@ public abstract class AbstractExecutionEnvironment implements ExecutionEnvironme
StreamConfig streamConfig = new StreamConfig(config);
String system = streamConfig.getSystem(streamId);
- String physicalName = streamConfig.getPhysicalName(streamId, streamId);
+ String physicalName = streamConfig.getPhysicalName(streamId, String.format("%s-%s", streamPrefix, streamId));
Map<String, String> properties = streamConfig.getStreamProperties(streamId);
return new StreamSpec(streamId, physicalName, system, properties);
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index ce129aa..3288c5c 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -46,7 +46,7 @@ public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
// TODO: actually instantiate the tasks and run the job, i.e.
try {
// 1. build stream graph
- StreamGraph streamGraph = new StreamGraphImpl();
+ StreamGraph streamGraph = new StreamGraphImpl(this);
app.init(streamGraph, config);
// 2. create the physical execution plan
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index 71d60ef..b88e356 100644
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -36,7 +36,7 @@ public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment
// TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
StreamGraph createGraph(StreamGraphBuilder app, Config config) {
- StreamGraphImpl graph = new StreamGraphImpl();
+ StreamGraphImpl graph = new StreamGraphImpl(this);
app.init(graph, config);
return graph;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index b007e3c..6032f4d 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -25,6 +25,7 @@ import org.apache.samza.operators.StreamGraphBuilder;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.data.InputMessageEnvelope;
import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.ExecutionEnvironment;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
@@ -77,8 +78,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
@Override
public final void init(Config config, TaskContext context) throws Exception {
+ // for now, we need to create the execution env again
+ // in the future if we decide to serialize the dag, this can be clean up
+ ExecutionEnvironment executionEnvironment = ExecutionEnvironment.fromConfig(config);
// create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
- StreamGraphImpl streams = new StreamGraphImpl();
+ StreamGraphImpl streams = new StreamGraphImpl(executionEnvironment);
this.graphBuilder.init(streams, config);
// get the context manager of the {@link StreamGraph} and initialize the task-specific context
this.contextManager = streams.getContextManager();
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
index 2eba59b..e4fb32f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
@@ -32,8 +32,8 @@ public class ConfigInheritence {
public static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
Config scopedConfig = fullConfig.subset(configPrefix);
- log.info("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
- log.info("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
+ log.debug("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
+ log.debug("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
Config[] configPrecedence;
if (INHERIT_ROOT_CONFIGS) {
@@ -53,7 +53,7 @@ public class ConfigInheritence {
}
}
scopedConfig = new MapConfig(mergedConfig);
- log.info("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+ log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
return scopedConfig;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 1c58293..fbfe90f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -45,6 +45,7 @@ object JobConfig {
val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
val JOB_METADATA_DEFAULT_SYSTEM = "job.metadata.system"
val JOB_DEFAULT_SYSTEM = "job.default.system"
+ val JOB_DEFAULT_PARTITIONS = "job.default.partitions"
val JOB_CONTAINER_COUNT = "job.container.count"
val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
new file mode 100644
index 0000000..68a2142
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.system.AbstractExecutionEnvironment;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionPlanner {
+
+ private Config config;
+
+ private ExecutionEnvironment env;
+
+ private static final String DEFAULT_SYSTEM = "test-system";
+ private static final int DEFAULT_PARTITIONS = 10;
+
+ private StreamSpec input1;
+ private StreamSpec input2;
+ private StreamSpec input3;
+ private StreamSpec output1;
+ private StreamSpec output2;
+
+ private Map<String, SystemAdmin> systemAdmins;
+
+ private JoinFunction createJoin() {
+ return new JoinFunction() {
+ @Override
+ public Object apply(Object message, Object otherMessage) {
+ return null;
+ }
+
+ @Override
+ public Object getFirstKey(Object message) {
+ return null;
+ }
+
+ @Override
+ public Object getSecondKey(Object message) {
+ return null;
+ }
+ };
+ }
+
+ private SinkFunction createSink() {
+ return new SinkFunction() {
+ @Override
+ public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
+ }
+ };
+ }
+
+ private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
+
+ return new SystemAdmin() {
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ return null;
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ Map<String, SystemStreamMetadata> map = new HashMap<>();
+ for (String stream : streamNames) {
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> m = new HashMap<>();
+ for (int i = 0; i < streamToPartitions.get(stream); i++) {
+ m.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", ""));
+ }
+ map.put(stream, new SystemStreamMetadata(stream, m));
+ }
+ return map;
+ }
+
+ @Override
+ public void createChangelogStream(String streamName, int numOfPartitions) {
+
+ }
+
+ @Override
+ public void validateChangelogStream(String streamName, int numOfPartitions) {
+
+ }
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+
+ }
+
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ return null;
+ }
+ };
+ }
+
+ private StreamGraph createSimpleGraph() {
+ /**
+ * a simple graph of partitionBy and map
+ *
+ * input1 -> partitionBy -> map -> output1
+ *
+ */
+ StreamGraph streamGraph = new StreamGraphImpl(env);
+ streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null));
+ return streamGraph;
+ }
+
+ private StreamGraph createStreamGraphWithJoin() {
+
+ /** the graph looks like the following
+ *
+ * input1 -> map -> join -> output1
+ * |
+ * input2 -> partitionBy -> filter -|
+ * |
+ * input3 -> filter -> partitionBy -> map -> join -> output2
+ *
+ */
+
+ StreamGraph streamGraph = new StreamGraphImpl(env);
+ MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m);
+ MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
+ MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+
+ m1.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output1, null, null));
+ m3.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output2, null, null));
+
+ return streamGraph;
+ }
+
+ @Before
+ public void setup() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "test-app");
+ configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), DEFAULT_SYSTEM);
+
+ config = new MapConfig(configMap);
+
+ env = new AbstractExecutionEnvironment(config) {
+ @Override
+ public void run(StreamGraphBuilder graphBuilder, Config config) {
+ }
+ };
+
+ input1 = new StreamSpec("input1", "input1", "system1");
+ input2 = new StreamSpec("input2", "input2", "system2");
+ input3 = new StreamSpec("input3", "input3", "system2");
+
+ output1 = new StreamSpec("output1", "output1", "system1");
+ output2 = new StreamSpec("output2", "output2", "system2");
+
+ // set up external partition count
+ Map<String, Integer> system1Map = new HashMap<>();
+ system1Map.put("input1", 64);
+ system1Map.put("output1", 8);
+ Map<String, Integer> system2Map = new HashMap<>();
+ system2Map.put("input2", 16);
+ system2Map.put("input3", 32);
+ system2Map.put("output2", 16);
+
+ SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
+ SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
+ systemAdmins = new HashMap<>();
+ systemAdmins.put("system1", systemAdmin1);
+ systemAdmins.put("system2", systemAdmin2);
+ }
+
+ @Test
+ public void testCreateProcessorGraph() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createStreamGraphWithJoin();
+
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ assertTrue(processorGraph.getSources().size() == 3);
+ assertTrue(processorGraph.getSinks().size() == 2);
+ assertTrue(processorGraph.getInternalStreams().size() == 2); // two streams generated by partitionBy
+ }
+
+ @Test
+ public void testFetchExistingStreamPartitions() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createStreamGraphWithJoin();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+ ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+ assertTrue(processorGraph.getEdge(input1).getPartitions() == 64);
+ assertTrue(processorGraph.getEdge(input2).getPartitions() == 16);
+ assertTrue(processorGraph.getEdge(input3).getPartitions() == 32);
+ assertTrue(processorGraph.getEdge(output1).getPartitions() == 8);
+ assertTrue(processorGraph.getEdge(output2).getPartitions() == 16);
+
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == -1);
+ });
+ }
+
+ @Test
+ public void testCalculateJoinInputPartitions() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createStreamGraphWithJoin();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+ ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+ ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
+
+ // the partitions should be the same as input1
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == 64);
+ });
+ }
+
+ @Test
+ public void testDefaultPartitions() {
+ Map<String, String> map = new HashMap<>(config);
+ map.put(JobConfig.JOB_DEFAULT_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
+ Config cfg = new MapConfig(map);
+
+ ExecutionPlanner planner = new ExecutionPlanner(cfg);
+ StreamGraph streamGraph = createSimpleGraph();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+ // the partitions should be the same as input1
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == DEFAULT_PARTITIONS);
+ });
+ }
+
+ @Test
+ public void testCalculateIntStreamPartitions() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createSimpleGraph();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+ // the partitions should be the same as input1
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == 64); // max of input1 and output1
+ });
+ }
+}