You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/10/10 21:48:23 UTC
samza git commit: SAMZA-1889: Extend ExecutionPlanner to support
Stream-Table Joins
Repository: samza
Updated Branches:
refs/heads/master 4915baac5 -> d2c9e8162
SAMZA-1889: Extend ExecutionPlanner to support Stream-Table Joins
Extend ExecutionPlanner to verify agreement in partition count among the
stream(s) behind Tables — including side-input streams — and other streams
participating in Stream-Table Joins.
Author: Ahmed Abdul Hamid <ah...@ahabdulh-mn1.linkedin.biz>
Author: Ahmed Elbahtemy <ah...@yahoo.com>
Author: Ahmed Abdul Hamid <ah...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #665 from ahmedahamid/dev/ahabdulh/extend-exec-planner
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d2c9e816
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d2c9e816
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d2c9e816
Branch: refs/heads/master
Commit: d2c9e81626539016756c3a93876c9f079b77e0f4
Parents: 4915baa
Author: Ahmed Abdul Hamid <ah...@ahabdulh-mn1.linkedin.biz>
Authored: Wed Oct 10 14:48:14 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Oct 10 14:48:14 2018 -0700
----------------------------------------------------------------------
.../samza/execution/ExecutionPlanner.java | 144 ++++++-
.../execution/IntermediateStreamManager.java | 253 +++---------
.../org/apache/samza/execution/JobGraph.java | 34 +-
.../execution/OperatorSpecGraphAnalyzer.java | 134 ++++++-
.../execution/ExecutionPlannerTestBase.java | 2 +-
.../samza/execution/TestExecutionPlanner.java | 399 +++++++++++++++----
.../TestIntermediateStreamManager.java | 68 ----
7 files changed, 663 insertions(+), 371 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d2c9e816/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index eea6387..b80f7df 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -20,14 +20,20 @@
package org.apache.samza.execution;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.commons.collections4.ListUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.ApplicationDescriptor;
import org.apache.samza.application.ApplicationDescriptorImpl;
@@ -38,12 +44,16 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.table.TableSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.samza.util.StreamUtil.*;
+import static org.apache.samza.util.StreamUtil.getStreamSpec;
+import static org.apache.samza.util.StreamUtil.getStreamSpecs;
/**
@@ -56,23 +66,33 @@ public class ExecutionPlanner {
private final Config config;
private final StreamManager streamManager;
+ private final StreamConfig streamConfig;
public ExecutionPlanner(Config config, StreamManager streamManager) {
this.config = config;
this.streamManager = streamManager;
+ this.streamConfig = new StreamConfig(config);
}
public ExecutionPlan plan(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
validateConfig();
- // create physical job graph based on stream graph
- JobGraph jobGraph = createJobGraph(config, appDesc);
+ // Create physical job graph based on stream graph
+ JobGraph jobGraph = createJobGraph(appDesc);
- // fetch the external streams partition info
- setInputAndOutputStreamPartitionCount(jobGraph, streamManager);
+ // Fetch the external streams partition info
+ setInputAndOutputStreamPartitionCount(jobGraph);
- // figure out the partitions for internal streams
- new IntermediateStreamManager(config, appDesc).calculatePartitions(jobGraph);
+ // Group streams participating in joins together into sets
+ List<StreamSet> joinedStreamSets = groupJoinedStreams(jobGraph);
+
+ // Set partitions of intermediate streams if any
+ if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
+ new IntermediateStreamManager(config).calculatePartitions(jobGraph, joinedStreamSets);
+ }
+
+ // Verify every group of joined streams has the same partition count
+ joinedStreamSets.forEach(ExecutionPlanner::validatePartitions);
return jobGraph;
}
@@ -88,12 +108,11 @@ public class ExecutionPlanner {
}
/**
- * Create the physical graph from {@link ApplicationDescriptorImpl}
+ * Creates the physical graph from {@link ApplicationDescriptorImpl}
*/
/* package private */
- JobGraph createJobGraph(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ JobGraph createJobGraph(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
JobGraph jobGraph = new JobGraph(config, appDesc);
- StreamConfig streamConfig = new StreamConfig(config);
// Source streams contain both input and intermediate streams.
Set<StreamSpec> sourceStreams = getStreamSpecs(appDesc.getInputStreamIds(), streamConfig);
// Sink streams contain both output and intermediate streams.
@@ -106,7 +125,7 @@ public class ExecutionPlanner {
Set<TableSpec> tables = appDesc.getTableDescriptors().stream()
.map(tableDescriptor -> ((BaseTableDescriptor) tableDescriptor).getTableSpec()).collect(Collectors.toSet());
- // For this phase, we have a single job node for the whole dag
+ // For this phase, we have a single job node for the whole DAG
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = config.get(JobConfig.JOB_ID(), "1");
JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
@@ -121,7 +140,14 @@ public class ExecutionPlanner {
intermediateStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
// Add tables
- tables.forEach(spec -> jobGraph.addTable(spec, node));
+ for (TableSpec table : tables) {
+ jobGraph.addTable(table, node);
+ // Add side-input streams (if any)
+ Iterable<String> sideInputs = ListUtils.emptyIfNull(table.getSideInputs());
+ for (String sideInput : sideInputs) {
+ jobGraph.addSideInputStream(getStreamSpec(sideInput, streamConfig));
+ }
+ }
if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
// skip the validation when input streamIds are empty. This is only possible for LegacyTaskApplication
@@ -132,13 +158,12 @@ public class ExecutionPlanner {
}
/**
- * Fetch the partitions of source/sink streams and update the StreamEdges.
- * @param jobGraph {@link JobGraph}
- * @param streamManager the {@link StreamManager} to interface with the streams.
+ * Fetches the partitions of input, side-input, and output streams and updates their corresponding StreamEdges.
*/
- /* package private */ static void setInputAndOutputStreamPartitionCount(JobGraph jobGraph, StreamManager streamManager) {
+ /* package private */ void setInputAndOutputStreamPartitionCount(JobGraph jobGraph) {
Set<StreamEdge> existingStreams = new HashSet<>();
existingStreams.addAll(jobGraph.getInputStreams());
+ existingStreams.addAll(jobGraph.getSideInputStreams());
existingStreams.addAll(jobGraph.getOutputStreams());
// System to StreamEdges
@@ -152,7 +177,7 @@ public class ExecutionPlanner {
// Fetch partition count for every set of StreamEdges belonging to a particular system.
for (String system : systemToStreamEdges.keySet()) {
- Collection<StreamEdge> streamEdges = systemToStreamEdges.get(system);
+ Iterable<StreamEdge> streamEdges = systemToStreamEdges.get(system);
// Map every stream to its corresponding StreamEdge so we can retrieve a StreamEdge given its stream.
Map<String, StreamEdge> streamToStreamEdge = new HashMap<>();
@@ -174,4 +199,89 @@ public class ExecutionPlanner {
}
}
+ /**
+ * Groups streams participating in joins together.
+ */
+ private static List<StreamSet> groupJoinedStreams(JobGraph jobGraph) {
+ // Group input operator specs (input/intermediate streams) by the joins they participate in.
+ Multimap<OperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs =
+ OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(
+ jobGraph.getApplicationDescriptorImpl().getInputOperators().values());
+
+ // Convert every group of input operator specs into a group of corresponding stream edges.
+ List<StreamSet> streamSets = new ArrayList<>();
+ for (OperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
+ Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec);
+ StreamSet streamSet = getStreamSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph);
+
+ // If current join is a stream-table join, add the stream edges corresponding to side-input
+ // streams associated with the joined table (if any).
+ if (joinOpSpec instanceof StreamTableJoinOperatorSpec) {
+ StreamTableJoinOperatorSpec streamTableJoinOperatorSpec = (StreamTableJoinOperatorSpec) joinOpSpec;
+
+ Collection<String> sideInputs = ListUtils.emptyIfNull(streamTableJoinOperatorSpec.getTableSpec().getSideInputs());
+ Iterable<StreamEdge> sideInputStreams = sideInputs.stream().map(jobGraph::getStreamEdge)::iterator;
+ Iterable<StreamEdge> streams = streamSet.getStreamEdges();
+ streamSet = new StreamSet(streamSet.getSetId(), Iterables.concat(streams, sideInputStreams));
+ }
+
+ streamSets.add(streamSet);
+ }
+
+ return Collections.unmodifiableList(streamSets);
+ }
+
+ /**
+ * Creates a {@link StreamSet} whose Id is {@code setId}, and {@link StreamEdge}s
+ * correspond to the provided {@code inputOpSpecs}.
+ */
+ private static StreamSet getStreamSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs, JobGraph jobGraph) {
+ Set<StreamEdge> streamEdges = new HashSet<>();
+ for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+ StreamEdge streamEdge = jobGraph.getStreamEdge(inputOpSpec.getStreamId());
+ streamEdges.add(streamEdge);
+ }
+ return new StreamSet(setId, streamEdges);
+ }
+
+ /**
+ * Verifies all {@link StreamEdge}s in the supplied {@code streamSet} agree in
+ * partition count, or throws.
+ */
+ private static void validatePartitions(StreamSet streamSet) {
+ Collection<StreamEdge> streamEdges = streamSet.getStreamEdges();
+ StreamEdge referenceStreamEdge = streamEdges.stream().findFirst().get();
+ int referencePartitions = referenceStreamEdge.getPartitionCount();
+
+ for (StreamEdge streamEdge : streamEdges) {
+ int partitions = streamEdge.getPartitionCount();
+ if (partitions != referencePartitions) {
+ throw new SamzaException(String.format(
+ "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d",
+ referenceStreamEdge.getName(), streamSet.getSetId(), referencePartitions, partitions));
+ }
+ }
+ }
+
+ /**
+ * Represents a set of {@link StreamEdge}s.
+ */
+ /* package private */ static class StreamSet {
+
+ private final String setId;
+ private final Set<StreamEdge> streamEdges;
+
+ StreamSet(String setId, Iterable<StreamEdge> streamEdges) {
+ this.setId = setId;
+ this.streamEdges = ImmutableSet.copyOf(streamEdges);
+ }
+
+ Set<StreamEdge> getStreamEdges() {
+ return Collections.unmodifiableSet(streamEdges);
+ }
+
+ String getSetId() {
+ return setId;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d2c9e816/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
index 66cbe6a..64fc7b3 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
@@ -20,25 +20,21 @@
package org.apache.samza.execution;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.samza.execution.ExecutionPlanner.StreamSet;
+
+
/**
* {@link IntermediateStreamManager} calculates intermediate stream partitions based on the high-level application graph.
*/
@@ -47,107 +43,31 @@ class IntermediateStreamManager {
private static final Logger log = LoggerFactory.getLogger(IntermediateStreamManager.class);
private final Config config;
- private final Map<String, InputOperatorSpec> inputOperators;
@VisibleForTesting
static final int MAX_INFERRED_PARTITIONS = 256;
- IntermediateStreamManager(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ IntermediateStreamManager(Config config) {
this.config = config;
- this.inputOperators = appDesc.getInputOperators();
- }
-
- /**
- * Figure out the number of partitions of all streams
- */
- /* package private */ void calculatePartitions(JobGraph jobGraph) {
-
- // Verify agreement in partition count between all joined input/intermediate streams
- validateJoinInputStreamPartitions(jobGraph);
-
- if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
- // Set partition count of intermediate streams not participating in joins
- setIntermediateStreamPartitions(jobGraph);
-
- // Validate partition counts were assigned for all intermediate streams
- validateIntermediateStreamPartitions(jobGraph);
- }
}
/**
- * Validates agreement in partition count between input/intermediate streams participating in join operations.
+ * Calculates the number of partitions of all intermediate streams
*/
- private void validateJoinInputStreamPartitions(JobGraph jobGraph) {
- // Group input operator specs (input/intermediate streams) by the joins they participate in.
- Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs =
- OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(inputOperators.values());
+ /* package private */ void calculatePartitions(JobGraph jobGraph, Collection<StreamSet> joinedStreamSets) {
- // Convert every group of input operator specs into a group of corresponding stream edges.
- List<StreamEdgeSet> streamEdgeSets = new ArrayList<>();
- for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
- Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec);
- StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph);
- streamEdgeSets.add(streamEdgeSet);
- }
+ // Set partition count of intermediate streams participating in joins
+ setJoinedIntermediateStreamPartitions(joinedStreamSets);
- /*
- * Sort the stream edge groups by their category so they appear in this order:
- * 1. groups composed exclusively of stream edges with set partition counts
- * 2. groups composed of a mix of stream edges with set/unset partition counts
- * 3. groups composed exclusively of stream edges with unset partition counts
- *
- * This guarantees that we process the most constrained stream edge groups first,
- * which is crucial for intermediate stream edges that are members of multiple
- * stream edge groups. For instance, if we have the following groups of stream
- * edges (partition counts in parentheses, question marks for intermediate streams):
- *
- * a. e1 (16), e2 (16)
- * b. e2 (16), e3 (?)
- * c. e3 (?), e4 (?)
- *
- * processing them in the above order (most constrained first) is guaranteed to
- * yield correct assignment of partition counts of e3 and e4 in a single scan.
- */
- Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder()));
+ // Set partition count of intermediate streams not participating in joins
+ setIntermediateStreamPartitions(jobGraph);
- // Verify agreement between joined input/intermediate streams.
- // This may involve setting partition counts of intermediate stream edges.
- streamEdgeSets.forEach(IntermediateStreamManager::validateAndAssignStreamEdgeSetPartitions);
+ // Validate partition counts were assigned for all intermediate streams
+ validateIntermediateStreamPartitions(jobGraph);
}
/**
- * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s
- * correspond to the provided {@code inputOpSpecs}.
- */
- private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs,
- JobGraph jobGraph) {
-
- int countStreamEdgeWithSetPartitions = 0;
- Set<StreamEdge> streamEdges = new HashSet<>();
-
- for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
- StreamEdge streamEdge = jobGraph.getStreamEdge(inputOpSpec.getStreamId());
- if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) {
- ++countStreamEdgeWithSetPartitions;
- }
- streamEdges.add(streamEdge);
- }
-
- // Determine category of stream group based on stream partition counts.
- StreamEdgeSet.StreamEdgeSetCategory category;
- if (countStreamEdgeWithSetPartitions == 0) {
- category = StreamEdgeSet.StreamEdgeSetCategory.NO_PARTITION_COUNT_SET;
- } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) {
- category = StreamEdgeSet.StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET;
- } else {
- category = StreamEdgeSet.StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET;
- }
-
- return new StreamEdgeSet(setId, streamEdges, category);
- }
-
- /**
- * Sets partition count of intermediate streams which have not been assigned partition counts.
+ * Sets partition counts of intermediate streams which have not been assigned partition counts.
*/
private void setIntermediateStreamPartitions(JobGraph jobGraph) {
final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
@@ -186,112 +106,67 @@ class IntermediateStreamManager {
}
/**
- * Ensures all intermediate streams have been assigned partition counts.
+ * Sets partition counts of intermediate streams participating in joins operations.
*/
- private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
- for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
- if (edge.getPartitionCount() <= 0) {
- throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
+ private static void setJoinedIntermediateStreamPartitions(Collection<StreamSet> joinedStreamSets) {
+ // Map every intermediate stream to all the stream-sets it appears in
+ Multimap<StreamEdge, StreamSet> intermediateStreamToStreamSets = HashMultimap.create();
+ for (StreamSet streamSet : joinedStreamSets) {
+ for (StreamEdge streamEdge : streamSet.getStreamEdges()) {
+ if (streamEdge.getPartitionCount() == StreamEdge.PARTITIONS_UNKNOWN) {
+ intermediateStreamToStreamSets.put(streamEdge, streamSet);
+ }
}
}
- }
-
- /**
- * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count.
- * This may include setting partition counts of intermediate streams in this set that do not
- * have their partition counts set.
- */
- private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) {
- Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges();
- StreamEdge firstStreamEdgeWithSetPartitions =
- streamEdges.stream()
- .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN)
- .findFirst()
- .orElse(null);
-
- // This group consists exclusively of intermediate streams with unknown partition counts.
- // We cannot do any validation/computation of partition counts of such streams right here,
- // but they are tackled later in the ExecutionPlanner.
- if (firstStreamEdgeWithSetPartitions == null) {
- return;
- }
- // Make sure all other stream edges in this group have the same partition count.
- int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount();
- for (StreamEdge streamEdge : streamEdges) {
- int streamPartitions = streamEdge.getPartitionCount();
- if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) {
- streamEdge.setPartitionCount(partitions);
- log.info("Inferred the partition count {} for the join operator {} from {}.",
- new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()});
- } else if (streamPartitions != partitions) {
- throw new SamzaException(String.format(
- "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d",
- streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions));
+ Set<StreamSet> streamSets = new HashSet<>(joinedStreamSets);
+ Set<StreamSet> processedStreamSets = new HashSet<>();
+
+ while (!streamSets.isEmpty()) {
+ // Retrieve and remove one stream set
+ StreamSet streamSet = streamSets.iterator().next();
+ streamSets.remove(streamSet);
+
+ // Find any stream with set partitions in this set
+ Optional<StreamEdge> streamWithSetPartitions =
+ streamSet.getStreamEdges().stream()
+ .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN)
+ .findAny();
+
+ if (streamWithSetPartitions.isPresent()) {
+ // Mark this stream-set as processed since we won't need to re-examine it ever again.
+ // It is important that we do this first before processing any intermediate streams
+ // that may be in this stream-set.
+ processedStreamSets.add(streamSet);
+
+ // Set partitions of all intermediate streams in this set (if any)
+ int partitions = streamWithSetPartitions.get().getPartitionCount();
+ for (StreamEdge streamEdge : streamSet.getStreamEdges()) {
+ if (streamEdge.getPartitionCount() == StreamEdge.PARTITIONS_UNKNOWN) {
+ streamEdge.setPartitionCount(partitions);
+ // Add all unprocessed stream-sets in which this intermediate stream appears
+ Collection<StreamSet> streamSetsIncludingIntStream = intermediateStreamToStreamSets.get(streamEdge);
+ streamSetsIncludingIntStream.stream()
+ .filter(s -> !processedStreamSets.contains(s))
+ .forEach(streamSets::add);
+ }
+ }
}
}
}
- /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
- return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
- }
-
/**
- * Represents a set of {@link StreamEdge}s.
+ * Ensures all intermediate streams have been assigned partition counts.
*/
- /* package private */ static class StreamEdgeSet {
-
- /**
- * Indicates whether all stream edges in this group have their partition counts assigned.
- */
- public enum StreamEdgeSetCategory {
- /**
- * All stream edges in this group have their partition counts assigned.
- */
- ALL_PARTITION_COUNT_SET(0),
-
- /**
- * Only some stream edges in this group have their partition counts assigned.
- */
- SOME_PARTITION_COUNT_SET(1),
-
- /**
- * No stream edge in this group is assigned a partition count.
- */
- NO_PARTITION_COUNT_SET(2);
-
-
- private final int sortOrder;
-
- StreamEdgeSetCategory(int sortOrder) {
- this.sortOrder = sortOrder;
- }
-
- public int getSortOrder() {
- return sortOrder;
+ private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
+ for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
+ if (edge.getPartitionCount() <= 0) {
+ throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
}
}
+ }
- private final String setId;
- private final Set<StreamEdge> streamEdges;
- private final StreamEdgeSetCategory category;
-
- StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) {
- this.setId = setId;
- this.streamEdges = streamEdges;
- this.category = category;
- }
-
- Set<StreamEdge> getStreamEdges() {
- return streamEdges;
- }
-
- String getSetId() {
- return setId;
- }
-
- StreamEdgeSetCategory getCategory() {
- return category;
- }
+ /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
+ return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d2c9e816/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index d975188..f43b24e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
-
import org.apache.samza.application.ApplicationDescriptor;
import org.apache.samza.application.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
@@ -58,6 +57,7 @@ import org.slf4j.LoggerFactory;
private final Set<StreamEdge> inputStreams = new HashSet<>();
private final Set<StreamEdge> outputStreams = new HashSet<>();
private final Set<StreamEdge> intermediateStreams = new HashSet<>();
+ private final Set<StreamEdge> sideInputStreams = new HashSet<>();
private final Set<TableSpec> tables = new HashSet<>();
private final Config config;
private final JobGraphJsonGenerator jsonGenerator;
@@ -156,6 +156,15 @@ import org.slf4j.LoggerFactory;
}
/**
+ * Add a side-input stream to graph
+ * @param streamSpec side-input stream
+ */
+ void addSideInputStream(StreamSpec streamSpec) {
+ StreamEdge edge = getOrCreateStreamEdge(streamSpec, false);
+ sideInputStreams.add(edge);
+ }
+
+ /**
* Get the {@link JobNode}. Create one if it does not exist.
* @param jobName name of the job
* @param jobId id of the job
@@ -176,6 +185,14 @@ import org.slf4j.LoggerFactory;
}
/**
+ * Returns the {@link ApplicationDescriptorImpl} of this graph.
+ * @return Application descriptor implementation
+ */
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor> getApplicationDescriptorImpl() {
+ return appDesc;
+ }
+
+ /**
* Get the {@link StreamEdge} for {@code streamId}.
*
* @param streamId the streamId for the {@link StreamEdge}
@@ -203,7 +220,15 @@ import org.slf4j.LoggerFactory;
}
/**
- * Return the output streams in the graph
+ * Returns the side-input streams in the graph
+ * @return unmodifiable set of {@link StreamEdge}
+ */
+ Set<StreamEdge> getSideInputStreams() {
+ return Collections.unmodifiableSet(sideInputStreams);
+ }
+
+ /**
+ * Returns the output streams in the graph
* @return unmodifiable set of {@link StreamEdge}
*/
Set<StreamEdge> getOutputStreams() {
@@ -211,7 +236,7 @@ import org.slf4j.LoggerFactory;
}
/**
- * Return the tables in the graph
+ * Returns the tables in the graph
* @return unmodifiable set of {@link TableSpec}
*/
Set<TableSpec> getTables() {
@@ -219,7 +244,7 @@ import org.slf4j.LoggerFactory;
}
/**
- * Return the intermediate streams in the graph
+ * Returns the intermediate streams in the graph
* @return unmodifiable set of {@link StreamEdge}
*/
Set<StreamEdge> getIntermediateStreamEdges() {
@@ -293,6 +318,7 @@ import org.slf4j.LoggerFactory;
private void validateInternalStreams() {
Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
internalEdges.removeAll(inputStreams);
+ internalEdges.removeAll(sideInputStreams);
internalEdges.removeAll(outputStreams);
internalEdges.forEach(edge -> {
http://git-wip-us.apache.org/repos/asf/samza/blob/d2c9e816/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
index ca91214..123244b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.samza.execution;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -30,6 +31,9 @@ import java.util.function.Function;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.table.TableSpec;
/**
@@ -39,27 +43,40 @@ import org.apache.samza.operators.spec.OperatorSpec;
/* package private */ class OperatorSpecGraphAnalyzer {
/**
- * Returns a grouping of {@link InputOperatorSpec}s by the joins, i.e. {@link JoinOperatorSpec}s, they participate in.
+ * Returns a grouping of {@link InputOperatorSpec}s by the joins, i.e. {@link JoinOperatorSpec}s and
+ * {@link StreamTableJoinOperatorSpec}s, they participate in.
+ *
+ * The key of the returned Multimap is of type {@link OperatorSpec} due to the lack of a stricter
+ * base type for {@link JoinOperatorSpec} and {@link StreamTableJoinOperatorSpec}. However, key
+ * objects are guaranteed to be of either type only.
*/
- public static Multimap<JoinOperatorSpec, InputOperatorSpec> getJoinToInputOperatorSpecs(
- Collection<InputOperatorSpec> inputOperatorSpecs) {
+ public static Multimap<OperatorSpec, InputOperatorSpec> getJoinToInputOperatorSpecs(
+ Collection<InputOperatorSpec> inputOpSpecs) {
- Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = HashMultimap.create();
+ Multimap<OperatorSpec, InputOperatorSpec> joinToInputOpSpecs = HashMultimap.create();
+
+ // Create a getNextOpSpecs() function that emulates connections between every SendToTableOperatorSpec
+ // — which are terminal OperatorSpecs — and all StreamTableJoinOperatorSpecs referencing the same TableSpec.
+ //
+ // This is necessary to support Stream-Table Join scenarios because it allows us to associate streams behind
+ // SendToTableOperatorSpecs with streams participating in Stream-Table Joins, a connection that would not be
+ // easy to make otherwise since SendToTableOperatorSpecs are terminal operator specs.
+ Function<OperatorSpec, Iterable<OperatorSpec>> getNextOpSpecs = getCustomGetNextOpSpecs(inputOpSpecs);
// Traverse graph starting from every input operator spec, observing connectivity between input operator specs
- // and Join operator specs.
- for (InputOperatorSpec inputOpSpec : inputOperatorSpecs) {
- // Observe all join operator specs reachable from this input operator spec.
- JoinOperatorSpecVisitor joinOperatorSpecVisitor = new JoinOperatorSpecVisitor();
- traverse(inputOpSpec, joinOperatorSpecVisitor, opSpec -> opSpec.getRegisteredOperatorSpecs());
-
- // Associate every encountered join operator spec with this input operator spec.
- for (JoinOperatorSpec joinOpSpec : joinOperatorSpecVisitor.getJoinOperatorSpecs()) {
- joinOpSpecToInputOpSpecs.put(joinOpSpec, inputOpSpec);
+ // and join-related operator specs.
+ for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+ // Observe all join-related operator specs reachable from this input operator spec.
+ JoinVisitor joinVisitor = new JoinVisitor();
+ traverse(inputOpSpec, joinVisitor, getNextOpSpecs);
+
+ // Associate every encountered join-related operator spec with this input operator spec.
+ for (OperatorSpec joinOpSpec : joinVisitor.getJoins()) {
+ joinToInputOpSpecs.put(joinOpSpec, inputOpSpec);
}
}
- return joinOpSpecToInputOpSpecs;
+ return joinToInputOpSpecs;
}
/**
@@ -67,7 +84,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
* {@link OperatorSpec}, and using {@code getNextOpSpecs} to determine the set of {@link OperatorSpec}s to visit next.
*/
private static void traverse(OperatorSpec startOpSpec, Consumer<OperatorSpec> visitor,
- Function<OperatorSpec, Collection<OperatorSpec>> getNextOpSpecs) {
+ Function<OperatorSpec, Iterable<OperatorSpec>> getNextOpSpecs) {
visitor.accept(startOpSpec);
for (OperatorSpec nextOpSpec : getNextOpSpecs.apply(startOpSpec)) {
traverse(nextOpSpec, visitor, getNextOpSpecs);
@@ -75,20 +92,93 @@ import org.apache.samza.operators.spec.OperatorSpec;
}
/**
- * An visitor that records all {@link JoinOperatorSpec}s encountered in the graph of {@link OperatorSpec}s
+ * Creates a function that retrieves the next {@link OperatorSpec}s of any given {@link OperatorSpec} in the specified
+ * {@code operatorSpecGraph}.
+ *
+ * Calling the returned function with any {@link SendToTableOperatorSpec} will return a collection of all
+ * {@link StreamTableJoinOperatorSpec}s that reference the same {@link TableSpec} as the specified
+ * {@link SendToTableOperatorSpec}, as if they were actually connected.
*/
- private static class JoinOperatorSpecVisitor implements Consumer<OperatorSpec> {
- private Set<JoinOperatorSpec> joinOpSpecs = new HashSet<>();
+ private static Function<OperatorSpec, Iterable<OperatorSpec>> getCustomGetNextOpSpecs(
+ Iterable<InputOperatorSpec> inputOpSpecs) {
+
+ // Traverse operatorSpecGraph to create mapping between every SendToTableOperatorSpec and all
+ // StreamTableJoinOperatorSpecs referencing the same TableSpec.
+ TableJoinVisitor tableJoinVisitor = new TableJoinVisitor();
+ for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+ traverse(inputOpSpec, tableJoinVisitor, opSpec -> opSpec.getRegisteredOperatorSpecs());
+ }
+
+ Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableOpSpecToStreamTableJoinOpSpecs =
+ tableJoinVisitor.getSendToTableOpSpecToStreamTableJoinOpSpecs();
+
+ return operatorSpec -> {
+ // If this is a SendToTableOperatorSpec, return all StreamTableJoinSpecs referencing the same TableSpec.
+ // For all other types of operator specs, return the next registered operator specs.
+ if (operatorSpec instanceof SendToTableOperatorSpec) {
+ SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) operatorSpec;
+ return Collections.unmodifiableCollection(sendToTableOpSpecToStreamTableJoinOpSpecs.get(sendToTableOperatorSpec));
+ }
+
+ return operatorSpec.getRegisteredOperatorSpecs();
+ };
+ }
+
+ /**
+ * An {@link OperatorSpec} visitor that records all {@link JoinOperatorSpec}s and {@link StreamTableJoinOperatorSpec}s
+ * encountered in the graph.
+ */
+ private static class JoinVisitor implements Consumer<OperatorSpec> {
+ private Set<OperatorSpec> joinOpSpecs = new HashSet<>();
@Override
- public void accept(OperatorSpec operatorSpec) {
- if (operatorSpec instanceof JoinOperatorSpec) {
- joinOpSpecs.add((JoinOperatorSpec) operatorSpec);
+ public void accept(OperatorSpec opSpec) {
+ if (opSpec instanceof JoinOperatorSpec || opSpec instanceof StreamTableJoinOperatorSpec) {
+ joinOpSpecs.add(opSpec);
}
}
- public Set<JoinOperatorSpec> getJoinOperatorSpecs() {
+ public Set<OperatorSpec> getJoins() {
return Collections.unmodifiableSet(joinOpSpecs);
}
}
+
+ /**
+ * An {@link OperatorSpec} visitor that records associations between every {@link SendToTableOperatorSpec}
+ * and all {@link StreamTableJoinOperatorSpec}s that reference the same {@link TableSpec}.
+ */
+ private static class TableJoinVisitor implements Consumer<OperatorSpec> {
+ private final Multimap<TableSpec, SendToTableOperatorSpec> tableSpecToSendToTableOpSpecs = HashMultimap.create();
+ private final Multimap<TableSpec, StreamTableJoinOperatorSpec> tableSpecToStreamTableJoinOpSpecs = HashMultimap.create();
+
+ @Override
+ public void accept(OperatorSpec opSpec) {
+ // Record all SendToTableOperatorSpecs, StreamTableJoinOperatorSpecs, and their corresponding TableSpecs.
+ if (opSpec instanceof SendToTableOperatorSpec) {
+ SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) opSpec;
+ tableSpecToSendToTableOpSpecs.put(sendToTableOperatorSpec.getTableSpec(), sendToTableOperatorSpec);
+ } else if (opSpec instanceof StreamTableJoinOperatorSpec) {
+ StreamTableJoinOperatorSpec streamTableJoinOpSpec = (StreamTableJoinOperatorSpec) opSpec;
+ tableSpecToStreamTableJoinOpSpecs.put(streamTableJoinOpSpec.getTableSpec(), streamTableJoinOpSpec);
+ }
+ }
+
+ public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> getSendToTableOpSpecToStreamTableJoinOpSpecs() {
+ Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableOpSpecToStreamTableJoinOpSpecs =
+ HashMultimap.create();
+
+ // Map every SendToTableOperatorSpec to all StreamTableJoinOperatorSpecs referencing the same TableSpec.
+ for (TableSpec tableSpec : tableSpecToSendToTableOpSpecs.keySet()) {
+ Collection<SendToTableOperatorSpec> sendToTableOpSpecs = tableSpecToSendToTableOpSpecs.get(tableSpec);
+ Collection<StreamTableJoinOperatorSpec> streamTableJoinOpSpecs =
+ tableSpecToStreamTableJoinOpSpecs.get(tableSpec);
+
+ for (SendToTableOperatorSpec sendToTableOpSpec : sendToTableOpSpecs) {
+ sendToTableOpSpecToStreamTableJoinOpSpecs.putAll(sendToTableOpSpec, streamTableJoinOpSpecs);
+ }
+ }
+
+ return Multimaps.unmodifiableMultimap(sendToTableOpSpecToStreamTableJoinOpSpecs);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d2c9e816/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
index d172005..6308589 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
@@ -102,7 +102,7 @@ class ExecutionPlannerTestBase {
void configureJobNode(ApplicationDescriptorImpl mockStreamAppDesc) {
JobGraph jobGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class))
- .createJobGraph(mockConfig, mockStreamAppDesc);
+ .createJobGraph(mockStreamAppDesc);
mockJobNode = spy(jobGraph.getJobNodes().get(0));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d2c9e816/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 213efde..4cfcfd2 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -21,6 +21,7 @@ package org.apache.samza.execution;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -40,6 +41,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -51,15 +53,19 @@ import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.SideInputsProcessor;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
import org.apache.samza.testUtils.StreamTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -145,7 +151,7 @@ public class TestExecutionPlanner {
}, config);
}
- private StreamApplicationDescriptorImpl createStreamGraphWithJoin() {
+ private StreamApplicationDescriptorImpl createStreamGraphWithStreamStreamJoin() {
/**
* the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
@@ -175,17 +181,38 @@ public class TestExecutionPlanner {
messageStream1
.join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
.sendTo(output1);
messageStream3
.join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
.sendTo(output2);
}, config);
}
+ private StreamApplicationDescriptorImpl createStreamGraphWithInvalidStreamStreamJoin() {
+ /**
+ * Creates the following stream-stream join which is invalid due to partition count disagreement
+ * between the 2 input streams.
+ *
+ * input1 (64) --
+ * |
+ * join -> output1 (8)
+ * |
+ * input3 (32) --
+ */
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
+ MessageStream<KV<Object, Object>> messageStream3 = appDesc.getInputStream(input3Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+
+ messageStream1
+ .join(messageStream3,
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+ .sendTo(output1);
+ }, config);
+ }
+
private StreamApplicationDescriptorImpl createStreamGraphWithJoinAndWindow() {
return new StreamApplicationDescriptorImpl(appDesc -> {
@@ -210,33 +237,180 @@ public class TestExecutionPlanner {
.filter(m -> true)
.window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), (Serde<KV<Object, Object>>) mock(Serde.class), (Serde<KV<Object, Object>>) mock(Serde.class)), "w2");
- messageStream1.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1").sendTo(output1);
- messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2").sendTo(output2);
- messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3").sendTo(output2);
+ messageStream1.join(messageStream2, mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1").sendTo(output1);
+ messageStream3.join(messageStream2, mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2").sendTo(output2);
+ messageStream3.join(messageStream2, mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3").sendTo(output2);
}, config);
}
- private StreamApplicationDescriptorImpl createStreamGraphWithInvalidJoin() {
+ private StreamApplicationDescriptorImpl createStreamGraphWithStreamTableJoin() {
/**
- * input1 (64) --
- * |
- * join -> output1 (8)
- * |
- * input3 (32) --
+ * Example stream-table join app. Expected partition counts of intermediate streams introduced
+ * by partitionBy operations are enclosed in quotes.
+ *
+ * input2 (16) -> partitionBy ("32") -> send-to-table t
+ *
+ * join-table t —————
+ * | |
+ * input1 (64) -> partitionBy ("32") _| |
+ * join -> output1 (8)
+ * |
+ * input3 (32) ——————
+ *
*/
return new StreamApplicationDescriptorImpl(appDesc -> {
MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
+ MessageStream<KV<Object, Object>> messageStream2 = appDesc.getInputStream(input2Descriptor);
MessageStream<KV<Object, Object>> messageStream3 = appDesc.getInputStream(input3Descriptor);
OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+ TableDescriptor tableDescriptor = new TestTableDescriptor("table-id");
+ Table table = appDesc.getTable(tableDescriptor);
+
+ messageStream2
+ .partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p1")
+ .sendTo(table);
+
messageStream1
- .join(messageStream3,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
- .sendTo(output1);
+ .partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p2")
+ .join(table, mock(StreamTableJoinFunction.class))
+ .join(messageStream3,
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+ .sendTo(output1);
+ }, config);
+ }
+
+ private StreamApplicationDescriptorImpl createStreamGraphWithComplexStreamStreamJoin() {
+ /**
+ * Example stream-table join app. Expected partition counts of intermediate streams introduced
+ * by partitionBy operations are enclosed in quotes.
+ *
+ * input1 (64) ________________________
+ * |
+ * join ————— output1 (8)
+ * |
+ * input2 (16) -> partitionBy ("64") --|
+ * |
+ * join ————— output1 (8)
+ * |
+ * input3 (32) -> partitionBy ("64") --|
+ * |
+ * join ————— output1 (8)
+ * |
+ * input4 (512) -> partitionBy ("64") __|
+ *
+ *
+ */
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
+
+ MessageStream<KV<Object, Object>> messageStream2 =
+ appDesc.getInputStream(input2Descriptor)
+ .partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p2");
+
+ MessageStream<KV<Object, Object>> messageStream3 =
+ appDesc.getInputStream(input3Descriptor)
+ .partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p3");
+
+ MessageStream<KV<Object, Object>> messageStream4 =
+ appDesc.getInputStream(input4Descriptor)
+ .partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p4");
+
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+
+ messageStream1
+ .join(messageStream2,
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1")
+ .sendTo(output1);
+
+ messageStream3
+ .join(messageStream4,
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+ .sendTo(output1);
+
+ messageStream2
+ .join(messageStream3,
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j3")
+ .sendTo(output1);
+ }, config);
+ }
+
+ private StreamApplicationDescriptorImpl createStreamGraphWithInvalidStreamTableJoin() {
+ /**
+ * Example stream-table join that is invalid due to disagreement in partition count
+ * between the 2 input streams.
+ *
+ * input1 (64) -> send-to-table t
+ *
+ * join-table t -> output1 (8)
+ * |
+ * input2 (16) —————————
+ *
+ */
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
+ MessageStream<KV<Object, Object>> messageStream2 = appDesc.getInputStream(input2Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+
+ TableDescriptor tableDescriptor = new TestTableDescriptor("table-id");
+ Table table = appDesc.getTable(tableDescriptor);
+
+ messageStream1.sendTo(table);
+
+ messageStream1
+ .join(table, mock(StreamTableJoinFunction.class))
+ .join(messageStream2,
+ mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+ .sendTo(output1);
+ }, config);
+ }
+
+ private StreamApplicationDescriptorImpl createStreamGraphWithStreamTableJoinWithSideInputs() {
+ /**
+ * Example stream-table join where table t is configured with input1 (64) as a side-input stream.
+ *
+ * join-table t -> output1 (8)
+ * |
+ * input2 (16) -> partitionBy ("64") __|
+ *
+ */
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream2 = appDesc.getInputStream(input2Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+
+ TableDescriptor tableDescriptor = new TestTableDescriptor("table-id", Arrays.asList("input1"),
+ (message, store) -> Collections.emptyList());
+ Table table = appDesc.getTable(tableDescriptor);
+
+ messageStream2
+ .partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p1")
+ .join(table, mock(StreamTableJoinFunction.class))
+ .sendTo(output1);
+ }, config);
+ }
+
+ private StreamApplicationDescriptorImpl createStreamGraphWithInvalidStreamTableJoinWithSideInputs() {
+ /**
+ * Example stream-table join that is invalid due to disagreement in partition count between the
+ * stream behind table t and another joined stream. Table t is configured with input2 (16) as
+ * side-input stream.
+ *
+ * join-table t -> output1 (8)
+ * |
+ * input1 (64) —————————
+ *
+ */
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+
+ TableDescriptor tableDescriptor = new TestTableDescriptor("table-id", Arrays.asList("input2"),
+ (message, store) -> Collections.emptyList());
+ Table table = appDesc.getTable(tableDescriptor);
+
+ messageStream1
+ .join(table, mock(StreamTableJoinFunction.class))
+ .sendTo(output1);
}, config);
}
@@ -307,9 +481,9 @@ public class TestExecutionPlanner {
@Test
public void testCreateProcessorGraph() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithStreamStreamJoin();
- JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), graphSpec);
+ JobGraph jobGraph = planner.createJobGraph(graphSpec);
assertTrue(jobGraph.getInputStreams().size() == 3);
assertTrue(jobGraph.getOutputStreams().size() == 2);
assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
@@ -318,10 +492,10 @@ public class TestExecutionPlanner {
@Test
public void testFetchExistingStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
- JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), graphSpec);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithStreamStreamJoin();
+ JobGraph jobGraph = planner.createJobGraph(graphSpec);
- ExecutionPlanner.setInputAndOutputStreamPartitionCount(jobGraph, streamManager);
+ planner.setInputAndOutputStreamPartitionCount(jobGraph);
assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() == 64);
assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() == 16);
assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() == 32);
@@ -336,24 +510,74 @@ public class TestExecutionPlanner {
@Test
public void testCalculateJoinInputPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
- JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), graphSpec);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithStreamStreamJoin();
+ JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
- ExecutionPlanner.setInputAndOutputStreamPartitionCount(jobGraph, streamManager);
- new IntermediateStreamManager(config, graphSpec).calculatePartitions(jobGraph);
+ // Partitions should be the same as input1
+ jobGraph.getIntermediateStreams().forEach(edge -> {
+ assertEquals(64, edge.getPartitionCount());
+ });
+ }
- // the partitions should be the same as input1
+ @Test
+ public void testCalculateOrderSensitiveJoinInputPartitions() {
+ // This test ensures that the ExecutionPlanner can handle groups of joined stream edges
+ // in the correct order. It creates an example stream-stream join application that has
+ // the following sets of joined streams (notice the order):
+ //
+ // a. e1 (16), e2` (?)
+ // b. e3` (?), e4` (?)
+ // c. e2` (?), e3` (?)
+ //
+ // If processed in the above order, the ExecutionPlanner will fail to assign the partitions
+ // correctly.
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithComplexStreamStreamJoin();
+ JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
+
+ // Partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
assertEquals(64, edge.getPartitionCount());
});
}
- @Test(expected = SamzaException.class)
- public void testRejectsInvalidJoin() {
+ @Test
+ public void testCalculateIntStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithInvalidJoin();
+ StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
- planner.plan(graphSpec);
+ JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
+
+ // Partitions should be the same as input1
+ jobGraph.getIntermediateStreams().forEach(edge -> {
+ assertEquals(64, edge.getPartitionCount()); // max of input1 and output1
+ });
+ }
+
+ @Test
+ public void testCalculateInStreamPartitionsBehindTables() {
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithStreamTableJoin();
+
+ JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
+
+ // Partitions should be the same as input3
+ jobGraph.getIntermediateStreams().forEach(edge -> {
+ assertEquals(32, edge.getPartitionCount());
+ });
+ }
+
+ @Test
+ public void testCalculateInStreamPartitionsBehindTablesWithSideInputs() {
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithStreamTableJoinWithSideInputs();
+
+ JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
+
+ // Partitions should be the same as input1
+ jobGraph.getIntermediateStreams().forEach(edge -> {
+ assertEquals(64, edge.getPartitionCount());
+ });
}
@Test
@@ -366,21 +590,65 @@ public class TestExecutionPlanner {
StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
- // the partitions should be the same as input1
+ // Partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
});
}
@Test
+ public void testMaxPartitionLimit() {
+ int partitionLimit = IntermediateStreamManager.MAX_INFERRED_PARTITIONS;
+
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> input1 = appDesc.getInputStream(input4Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+ input1.partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p1").map(kv -> kv).sendTo(output1);
+ }, config);
+
+ JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
+
+ // Partitions should be the same as input1
+ jobGraph.getIntermediateStreams().forEach(edge -> {
+ assertEquals(partitionLimit, edge.getPartitionCount()); // max of input1 and output1
+ });
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testRejectsInvalidStreamStreamJoin() {
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithInvalidStreamStreamJoin();
+
+ planner.plan(graphSpec);
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testRejectsInvalidStreamTableJoin() {
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithInvalidStreamTableJoin();
+
+ planner.plan(graphSpec);
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testRejectsInvalidStreamTableJoinWithSideInputs() {
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithInvalidStreamTableJoinWithSideInputs();
+
+ planner.plan(graphSpec);
+ }
+
+ @Test
public void testTriggerIntervalForJoins() {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithStreamStreamJoin();
ExecutionPlan plan = planner.plan(graphSpec);
+
List<JobConfig> jobConfigs = plan.getJobConfigs();
for (JobConfig config : jobConfigs) {
System.out.println(config);
@@ -450,18 +718,6 @@ public class TestExecutionPlanner {
}
@Test
- public void testCalculateIntStreamPartitions() {
- ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
- JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
-
- // the partitions should be the same as input1
- jobGraph.getIntermediateStreams().forEach(edge -> {
- assertEquals(64, edge.getPartitionCount()); // max of input1 and output1
- });
- }
-
- @Test
public void testMaxPartition() {
Collection<StreamEdge> edges = new ArrayList<>();
StreamEdge edge = new StreamEdge(input1Spec, false, false, config);
@@ -481,25 +737,6 @@ public class TestExecutionPlanner {
}
@Test
- public void testMaxPartitionLimit() throws Exception {
- int partitionLimit = IntermediateStreamManager.MAX_INFERRED_PARTITIONS;
-
- ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream<KV<Object, Object>> input1 = appDesc.getInputStream(input4Descriptor);
- OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
- input1.partitionBy(m -> m.key, m -> m.value, mock(KVSerde.class), "p1").map(kv -> kv).sendTo(output1);
- }, config);
-
- JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
-
- // the partitions should be the same as input1
- jobGraph.getIntermediateStreams().forEach(edge -> {
- assertEquals(partitionLimit, edge.getPartitionCount()); // max of input1 and output1
- });
- }
-
- @Test
public void testCreateJobGraphForTaskApplication() {
TaskApplicationDescriptorImpl taskAppDesc = mock(TaskApplicationDescriptorImpl.class);
// add interemediate streams
@@ -537,7 +774,7 @@ public class TestExecutionPlanner {
systemDescriptors.forEach(sd -> systemStreamConfigs.putAll(sd.toConfig()));
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- JobGraph jobGraph = planner.createJobGraph(config, taskAppDesc);
+ JobGraph jobGraph = planner.createJobGraph(taskAppDesc);
assertEquals(1, jobGraph.getJobNodes().size());
assertTrue(jobGraph.getInputStreams().stream().map(edge -> edge.getName())
.filter(streamId -> inputDescriptors.containsKey(streamId)).collect(Collectors.toList()).isEmpty());
@@ -568,7 +805,7 @@ public class TestExecutionPlanner {
systemDescriptors.forEach(sd -> systemStreamConfigs.putAll(sd.toConfig()));
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- JobGraph jobGraph = planner.createJobGraph(config, taskAppDesc);
+ JobGraph jobGraph = planner.createJobGraph(taskAppDesc);
assertEquals(1, jobGraph.getJobNodes().size());
JobNode jobNode = jobGraph.getJobNodes().get(0);
assertEquals("test-app", jobNode.getJobName());
@@ -586,4 +823,26 @@ public class TestExecutionPlanner {
}
}
+
+ private static class TestTableDescriptor extends BaseTableDescriptor implements TableDescriptor {
+ private final List<String> sideInputs;
+ private final SideInputsProcessor sideInputsProcessor;
+
+ public TestTableDescriptor(String tableId) {
+ this(tableId, Collections.emptyList(), null);
+ }
+
+ public TestTableDescriptor(String tableId, List<String> sideInputs, SideInputsProcessor sideInputsProcessor) {
+ super(tableId);
+ this.sideInputs = sideInputs;
+ this.sideInputsProcessor = sideInputsProcessor;
+ }
+
+ @Override
+ public TableSpec getTableSpec() {
+ validate();
+ return new TableSpec(tableId, serde, "dummyTableProviderFactoryClassName",
+ Collections.emptyMap(), sideInputs, sideInputsProcessor);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d2c9e816/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java b/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
deleted file mode 100644
index bc15709..0000000
--- a/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.execution;
-
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-/**
- * Unit tests for {@link IntermediateStreamManager}
- */
-public class TestIntermediateStreamManager extends ExecutionPlannerTestBase {
-
- @Test
- public void testCalculateRepartitionJoinTopicPartitions() {
- mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
- IntermediateStreamManager partitionPlanner = new IntermediateStreamManager(mockConfig, mockStreamAppDesc);
- JobGraph mockGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class))
- .createJobGraph(mockConfig, mockStreamAppDesc);
- // set the input stream partitions
- mockGraph.getInputStreams().forEach(inEdge -> {
- if (inEdge.getStreamSpec().getId().equals(input1Descriptor.getStreamId())) {
- inEdge.setPartitionCount(6);
- } else if (inEdge.getStreamSpec().getId().equals(input2Descriptor.getStreamId())) {
- inEdge.setPartitionCount(5);
- }
- });
- partitionPlanner.calculatePartitions(mockGraph);
- assertEquals(1, mockGraph.getIntermediateStreamEdges().size());
- assertEquals(5, mockGraph.getIntermediateStreamEdges().stream()
- .filter(inEdge -> inEdge.getStreamSpec().getId().equals(intermediateInputDescriptor.getStreamId()))
- .findFirst().get().getPartitionCount());
- }
-
- @Test
- public void testCalculateRepartitionIntermediateTopicPartitions() {
- mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionOnlyStreamApplication(), mockConfig);
- IntermediateStreamManager partitionPlanner = new IntermediateStreamManager(mockConfig, mockStreamAppDesc);
- JobGraph mockGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class))
- .createJobGraph(mockConfig, mockStreamAppDesc);
- // set the input stream partitions
- mockGraph.getInputStreams().forEach(inEdge -> inEdge.setPartitionCount(7));
- partitionPlanner.calculatePartitions(mockGraph);
- assertEquals(1, mockGraph.getIntermediateStreamEdges().size());
- assertEquals(7, mockGraph.getIntermediateStreamEdges().stream()
- .filter(inEdge -> inEdge.getStreamSpec().getId().equals(intermediateInputDescriptor.getStreamId()))
- .findFirst().get().getPartitionCount());
- }
-
-}