You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/09 01:55:51 UTC
[flink] branch master updated: [FLINK-28380][runtime] Produce one intermediate dataset for multiple consumer job vertices consuming the same data
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 60bc87c0b83 [FLINK-28380][runtime] Produce one intermediate dataset for multiple consumer job vertices consuming the same data
60bc87c0b83 is described below
commit 60bc87c0b83149c4f19d7e54af2d967087a277fb
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Mon Jul 25 16:22:50 2022 +0800
[FLINK-28380][runtime] Produce one intermediate dataset for multiple consumer job vertices consuming the same data
This closes #20351.
---
.../io/network/partition/ResultPartitionType.java | 25 +++-
.../streaming/api/graph/NonChainedOutput.java | 146 +++++++++++++++++++++
.../flink/streaming/api/graph/StreamConfig.java | 43 +++---
.../api/graph/StreamingJobGraphGenerator.java | 141 +++++++++++++++-----
.../partitioner/CustomPartitionerWrapper.java | 20 +++
.../streaming/runtime/tasks/OperatorChain.java | 71 +++++-----
.../flink/streaming/runtime/tasks/StreamTask.java | 20 +--
.../api/graph/StreamingJobGraphGeneratorTest.java | 93 ++++++++++++-
.../ForwardForConsecutiveHashPartitionerTest.java | 26 ++--
.../ForwardForUnspecifiedPartitionerTest.java | 25 ++--
.../runtime/tasks/StreamConfigChainer.java | 69 +++++-----
.../tasks/StreamTaskMailboxTestHarnessBuilder.java | 25 +++-
.../runtime/tasks/StreamTaskTestHarness.java | 30 +++--
.../flink/streaming/util/MockStreamConfig.java | 29 ++--
.../runtime/partitioner/BinaryHashPartitioner.java | 20 +++
15 files changed, 588 insertions(+), 195 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index c82e55a8321..830a2b30687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -32,7 +32,7 @@ public enum ResultPartitionType {
* {@link #PIPELINED} partitions), but only released through the scheduler, when it determines
* that the partition is no longer needed.
*/
- BLOCKING(false, false, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
+ BLOCKING(true, false, false, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
/**
* BLOCKING_PERSISTENT partitions are similar to {@link #BLOCKING} partitions, but have a
@@ -45,7 +45,7 @@ public enum ResultPartitionType {
* scenarios, like when the TaskManager exits or when the TaskManager loses connection to
* JobManager / ResourceManager for too long.
*/
- BLOCKING_PERSISTENT(false, true, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
+ BLOCKING_PERSISTENT(true, false, true, ConsumingConstraint.BLOCKING, ReleaseBy.SCHEDULER),
/**
* A pipelined streaming data exchange. This is applicable to both bounded and unbounded
@@ -57,7 +57,7 @@ public enum ResultPartitionType {
* <p>This result partition type may keep an arbitrary amount of data in-flight, in contrast to
* the {@link #PIPELINED_BOUNDED} variant.
*/
- PIPELINED(false, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
+ PIPELINED(false, false, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
/**
* Pipelined partitions with a bounded (local) buffer pool.
@@ -70,7 +70,8 @@ public enum ResultPartitionType {
* <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there
* are no checkpoint barriers.
*/
- PIPELINED_BOUNDED(true, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
+ PIPELINED_BOUNDED(
+ false, true, false, ConsumingConstraint.MUST_BE_PIPELINED, ReleaseBy.UPSTREAM),
/**
* Pipelined partitions with a bounded (local) buffer pool to support downstream task to
@@ -81,7 +82,8 @@ public enum ResultPartitionType {
* in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
* fails.
*/
- PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+ PIPELINED_APPROXIMATE(
+ false, true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
/**
* Hybrid partitions with a bounded (local) buffer pool to support downstream task to
@@ -89,7 +91,12 @@ public enum ResultPartitionType {
*
* <p>Hybrid partitions can be consumed any time, whether fully produced or not.
*/
- HYBRID(false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
+ HYBRID(false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
+
+ /**
+ * Can this result partition be consumed by multiple downstream consumers for multiple times.
+ */
+ private final boolean isReconsumable;
/** Does this partition use a limited number of (network) buffers? */
private final boolean isBounded;
@@ -125,10 +132,12 @@ public enum ResultPartitionType {
/** Specifies the behaviour of an intermediate result partition at runtime. */
ResultPartitionType(
+ boolean isReconsumable,
boolean isBounded,
boolean isPersistent,
ConsumingConstraint consumingConstraint,
ReleaseBy releaseBy) {
+ this.isReconsumable = isReconsumable;
this.isBounded = isBounded;
this.isPersistent = isPersistent;
this.consumingConstraint = consumingConstraint;
@@ -200,4 +209,8 @@ public enum ResultPartitionType {
public boolean supportCompression() {
return isBlockingOrBlockingPersistentResultPartition() || this == HYBRID;
}
+
+ public boolean isReconsumable() {
+ return isReconsumable;
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
new file mode 100644
index 00000000000..20d357c14e4
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.OutputTag;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Used by operator chain and represents a non-chained output of the corresponding stream operator.
+ */
+@Internal
+public class NonChainedOutput implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Is unaligned checkpoint supported. */
+ private final boolean supportsUnalignedCheckpoints;
+
+ /** ID of the producer {@link StreamNode}. */
+ private final int sourceNodeId;
+
+ /** Parallelism of the consumer vertex. */
+ private final int consumerParallelism;
+
+ /** Max parallelism of the consumer vertex. */
+ private final int consumerMaxParallelism;
+
+ /** Buffer flush timeout of this output. */
+ private final long bufferTimeout;
+
+ /** ID of the produced intermediate dataset. */
+ private final IntermediateDataSetID dataSetId;
+
+ /** Whether this intermediate dataset is a persistent dataset or not. */
+ private final boolean isPersistentDataSet;
+
+ /** The side-output tag (if any). */
+ private final OutputTag<?> outputTag;
+
+ /** The corresponding data partitioner. */
+ private final StreamPartitioner<?> partitioner;
+
+ /** Target {@link ResultPartitionType}. */
+ private final ResultPartitionType partitionType;
+
+ public NonChainedOutput(
+ boolean supportsUnalignedCheckpoints,
+ int sourceNodeId,
+ int consumerParallelism,
+ int consumerMaxParallelism,
+ long bufferTimeout,
+ boolean isPersistentDataSet,
+ IntermediateDataSetID dataSetId,
+ OutputTag<?> outputTag,
+ StreamPartitioner<?> partitioner,
+ ResultPartitionType partitionType) {
+ this.supportsUnalignedCheckpoints = supportsUnalignedCheckpoints;
+ this.sourceNodeId = sourceNodeId;
+ this.consumerParallelism = consumerParallelism;
+ this.consumerMaxParallelism = consumerMaxParallelism;
+ this.bufferTimeout = bufferTimeout;
+ this.isPersistentDataSet = isPersistentDataSet;
+ this.dataSetId = dataSetId;
+ this.outputTag = outputTag;
+ this.partitioner = partitioner;
+ this.partitionType = partitionType;
+ }
+
+ public boolean supportsUnalignedCheckpoints() {
+ return supportsUnalignedCheckpoints;
+ }
+
+ public int getSourceNodeId() {
+ return sourceNodeId;
+ }
+
+ public int getConsumerParallelism() {
+ return consumerParallelism;
+ }
+
+ public int getConsumerMaxParallelism() {
+ return consumerMaxParallelism;
+ }
+
+ public long getBufferTimeout() {
+ return bufferTimeout;
+ }
+
+ public IntermediateDataSetID getDataSetId() {
+ return dataSetId;
+ }
+
+ public IntermediateDataSetID getPersistentDataSetId() {
+ return isPersistentDataSet ? dataSetId : null;
+ }
+
+ public OutputTag<?> getOutputTag() {
+ return outputTag;
+ }
+
+ public StreamPartitioner<?> getPartitioner() {
+ return partitioner;
+ }
+
+ public ResultPartitionType getPartitionType() {
+ return partitionType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ NonChainedOutput output = (NonChainedOutput) o;
+ return Objects.equals(dataSetId, output.dataSetId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataSetId);
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 8c85b4343d2..57cb1bca1ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
import org.apache.flink.runtime.state.CheckpointStorage;
@@ -91,8 +92,8 @@ public class StreamConfig implements Serializable {
private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
private static final String ITERATON_WAIT = "iterationWait";
- private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
- private static final String EDGES_IN_ORDER = "edgesInOrder";
+ private static final String OP_NONCHAINED_OUTPUTS = "opNonChainedOutputs";
+ private static final String VERTEX_NONCHAINED_OUTPUTS = "vertexNonChainedOutputs";
private static final String IN_STREAM_EDGES = "inStreamEdges";
private static final String OPERATOR_NAME = "operatorName";
private static final String OPERATOR_ID = "operatorID";
@@ -434,15 +435,16 @@ public class StreamConfig implements Serializable {
return config.getInteger(NUMBER_OF_OUTPUTS, 0);
}
- public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
- toBeSerializedConfigObjects.put(NONCHAINED_OUTPUTS, outputvertexIDs);
+ /** Sets the operator level non-chained outputs. */
+ public void setOperatorNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {
+ toBeSerializedConfigObjects.put(OP_NONCHAINED_OUTPUTS, nonChainedOutputs);
}
- public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
+ public List<NonChainedOutput> getOperatorNonChainedOutputs(ClassLoader cl) {
try {
- List<StreamEdge> nonChainedOutputs =
- InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
- return nonChainedOutputs == null ? new ArrayList<StreamEdge>() : nonChainedOutputs;
+ List<NonChainedOutput> nonChainedOutputs =
+ InstantiationUtil.readObjectFromConfig(this.config, OP_NONCHAINED_OUTPUTS, cl);
+ return nonChainedOutputs == null ? new ArrayList<>() : nonChainedOutputs;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate non chained outputs.", e);
}
@@ -520,15 +522,20 @@ public class StreamConfig implements Serializable {
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, alignedCheckpointTimeout);
}
- public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
- toBeSerializedConfigObjects.put(EDGES_IN_ORDER, outEdgeList);
+ /**
+ * Sets the job vertex level non-chained outputs. The given output list must have the same order
+ * with {@link JobVertex#getProducedDataSets()}.
+ */
+ public void setVertexNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {
+ toBeSerializedConfigObjects.put(VERTEX_NONCHAINED_OUTPUTS, nonChainedOutputs);
}
- public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
+ public List<NonChainedOutput> getVertexNonChainedOutputs(ClassLoader cl) {
try {
- List<StreamEdge> outEdgesInOrder =
- InstantiationUtil.readObjectFromConfig(this.config, EDGES_IN_ORDER, cl);
- return outEdgesInOrder == null ? new ArrayList<StreamEdge>() : outEdgesInOrder;
+ List<NonChainedOutput> nonChainedOutputs =
+ InstantiationUtil.readObjectFromConfig(
+ this.config, VERTEX_NONCHAINED_OUTPUTS, cl);
+ return nonChainedOutputs == null ? new ArrayList<>() : nonChainedOutputs;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate outputs in order.", e);
}
@@ -721,11 +728,11 @@ public class StreamConfig implements Serializable {
builder.append("=======================");
builder.append("\nNumber of non-chained inputs: ").append(getNumberOfNetworkInputs());
builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
- builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
+ builder.append("\nOutput names: ").append(getOperatorNonChainedOutputs(cl));
builder.append("\nPartitioning:");
- for (StreamEdge output : getNonChainedOutputs(cl)) {
- int outputname = output.getTargetId();
- builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
+ for (NonChainedOutput output : getOperatorNonChainedOutputs(cl)) {
+ String outputName = output.getDataSetId().toString();
+ builder.append("\n\t").append(outputName).append(": ").append(output.getPartitioner());
}
builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 4cf9ed7c990..e44ed175626 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -97,9 +97,11 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -176,6 +178,8 @@ public class StreamingJobGraphGenerator {
// Futures for the serialization of operator coordinators
private final List<CompletableFuture<Void>> coordinatorSerializationFutures = new ArrayList<>();
+ private final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs;
+
private StreamingJobGraphGenerator(
StreamGraph streamGraph, @Nullable JobID jobID, Executor serializationExecutor) {
this.streamGraph = streamGraph;
@@ -192,6 +196,7 @@ public class StreamingJobGraphGenerator {
this.chainedInputOutputFormats = new HashMap<>();
this.physicalEdgesInOrder = new ArrayList<>();
this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor);
+ this.opIntermediateOutputs = new HashMap<>();
jobGraph = new JobGraph(jobID, streamGraph.getJobName());
}
@@ -677,11 +682,15 @@ public class StreamingJobGraphGenerator {
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
+ LinkedHashSet<NonChainedOutput> transitiveOutputs = new LinkedHashSet<>();
for (StreamEdge edge : transitiveOutEdges) {
- connect(startNodeId, edge);
+ NonChainedOutput output =
+ opIntermediateOutputs.get(edge.getSourceId()).get(edge);
+ transitiveOutputs.add(output);
+ connect(startNodeId, edge, output);
}
- config.setOutEdgesInOrder(transitiveOutEdges);
+ config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
@@ -937,8 +946,10 @@ public class StreamingJobGraphGenerator {
config.setStreamOperatorFactory(vertex.getOperatorFactory());
- config.setNumberOfOutputs(nonChainableOutputs.size());
- config.setNonChainedOutputs(nonChainableOutputs);
+ List<NonChainedOutput> deduplicatedOutputs =
+ mayReuseNonChainedOutputs(vertexID, nonChainableOutputs);
+ config.setNumberOfOutputs(deduplicatedOutputs.size());
+ config.setOperatorNonChainedOutputs(deduplicatedOutputs);
config.setChainedOutputs(chainableOutputs);
config.setTimeCharacteristic(streamGraph.getTimeCharacteristic());
@@ -975,6 +986,75 @@ public class StreamingJobGraphGenerator {
vertexConfigs.put(vertexID, config);
}
+ private List<NonChainedOutput> mayReuseNonChainedOutputs(
+ int vertexId, List<StreamEdge> consumerEdges) {
+ if (consumerEdges.isEmpty()) {
+ return new ArrayList<>();
+ }
+ List<NonChainedOutput> outputs = new ArrayList<>(consumerEdges.size());
+ Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
+ opIntermediateOutputs.computeIfAbsent(vertexId, ignored -> new HashMap<>());
+ for (StreamEdge consumerEdge : consumerEdges) {
+ checkState(vertexId == consumerEdge.getSourceId(), "Vertex id must be the same.");
+ int consumerParallelism =
+ streamGraph.getStreamNode(consumerEdge.getTargetId()).getParallelism();
+ int consumerMaxParallelism =
+ streamGraph.getStreamNode(consumerEdge.getTargetId()).getMaxParallelism();
+ StreamPartitioner<?> partitioner = consumerEdge.getPartitioner();
+ ResultPartitionType partitionType = getResultPartitionType(consumerEdge);
+ IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+
+ boolean isPersistentDataSet =
+ isPersistentIntermediateDataset(partitionType, consumerEdge);
+ if (isPersistentDataSet) {
+ partitionType = ResultPartitionType.BLOCKING_PERSISTENT;
+ dataSetId = consumerEdge.getIntermediateDatasetIdToProduce();
+ }
+
+ NonChainedOutput output =
+ new NonChainedOutput(
+ consumerEdge.supportsUnalignedCheckpoints(),
+ consumerEdge.getSourceId(),
+ consumerParallelism,
+ consumerMaxParallelism,
+ consumerEdge.getBufferTimeout(),
+ isPersistentDataSet,
+ dataSetId,
+ consumerEdge.getOutputTag(),
+ partitioner,
+ partitionType);
+ if (!partitionType.isReconsumable()) {
+ outputs.add(output);
+ outputsConsumedByEdge.put(consumerEdge, output);
+ } else {
+ NonChainedOutput reusableOutput = null;
+ for (NonChainedOutput outputCandidate : outputsConsumedByEdge.values()) {
+ // the target output can be reused if they have the same partitioner and
+ // consumer parallelism, reusing the same output can improve performance
+ if (outputCandidate.getPartitionType().isReconsumable()
+ && consumerParallelism == outputCandidate.getConsumerParallelism()
+ && consumerMaxParallelism == outputCandidate.getConsumerMaxParallelism()
+ && outputCandidate.getPartitionType() == partitionType
+ && Objects.equals(
+ outputCandidate.getPersistentDataSetId(),
+ consumerEdge.getIntermediateDatasetIdToProduce())
+ && Objects.equals(
+ outputCandidate.getOutputTag(), consumerEdge.getOutputTag())
+ && Objects.equals(partitioner, outputCandidate.getPartitioner())) {
+ reusableOutput = outputCandidate;
+ outputsConsumedByEdge.put(consumerEdge, reusableOutput);
+ break;
+ }
+ }
+ if (reusableOutput == null) {
+ outputs.add(output);
+ outputsConsumedByEdge.put(consumerEdge, output);
+ }
+ }
+ }
+ return outputs;
+ }
+
private void tryConvertPartitionerForDynamicGraph(
List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
@@ -1026,7 +1106,7 @@ public class StreamingJobGraphGenerator {
}
}
- private void connect(Integer headOfChain, StreamEdge edge) {
+ private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {
physicalEdgesInOrder.add(edge);
@@ -1039,37 +1119,13 @@ public class StreamingJobGraphGenerator {
downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);
- StreamPartitioner<?> partitioner = edge.getPartitioner();
-
- ResultPartitionType resultPartitionType;
- switch (edge.getExchangeMode()) {
- case PIPELINED:
- resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
- break;
- case BATCH:
- resultPartitionType = ResultPartitionType.BLOCKING;
- break;
- case HYBRID:
- resultPartitionType = ResultPartitionType.HYBRID;
- break;
- case UNDEFINED:
- resultPartitionType = determineResultPartitionType(partitioner);
- break;
- default:
- throw new UnsupportedOperationException(
- "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
- }
+ StreamPartitioner<?> partitioner = output.getPartitioner();
+ ResultPartitionType resultPartitionType = output.getPartitionType();
if (resultPartitionType == ResultPartitionType.HYBRID) {
hasHybridResultPartition = true;
}
- IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
- if (isPersistentIntermediateDataset(resultPartitionType, edge)) {
- resultPartitionType = ResultPartitionType.BLOCKING_PERSISTENT;
- intermediateDataSetID = edge.getIntermediateDatasetIdToProduce();
- }
-
checkBufferTimeout(resultPartitionType, edge);
JobEdge jobEdge;
@@ -1079,7 +1135,7 @@ public class StreamingJobGraphGenerator {
headVertex,
DistributionPattern.POINTWISE,
resultPartitionType,
- intermediateDataSetID,
+ opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
partitioner.isBroadcast());
} else {
jobEdge =
@@ -1087,7 +1143,7 @@ public class StreamingJobGraphGenerator {
headVertex,
DistributionPattern.ALL_TO_ALL,
resultPartitionType,
- intermediateDataSetID,
+ opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
partitioner.isBroadcast());
}
@@ -1125,7 +1181,24 @@ public class StreamingJobGraphGenerator {
}
}
- private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> partitioner) {
+ private ResultPartitionType getResultPartitionType(StreamEdge edge) {
+ switch (edge.getExchangeMode()) {
+ case PIPELINED:
+ return ResultPartitionType.PIPELINED_BOUNDED;
+ case BATCH:
+ return ResultPartitionType.BLOCKING;
+ case HYBRID:
+ return ResultPartitionType.HYBRID;
+ case UNDEFINED:
+ return determineUndefinedResultPartitionType(edge.getPartitioner());
+ default:
+ throw new UnsupportedOperationException(
+ "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
+ }
+ }
+
+ private ResultPartitionType determineUndefinedResultPartitionType(
+ StreamPartitioner<?> partitioner) {
switch (streamGraph.getGlobalStreamExchangeMode()) {
case ALL_EDGES_BLOCKING:
return ResultPartitionType.BLOCKING;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 2bf229e1c12..e3c5cb442a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
+import java.util.Objects;
/**
* Partitioner that selects the channel with a user defined partitioner function on a key.
@@ -76,6 +77,25 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
}
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final CustomPartitionerWrapper<?, ?> that = (CustomPartitionerWrapper<?, ?>) o;
+ return numberOfChannels == that.numberOfChannels
+ && Objects.equals(partitioner, that.partitioner)
+ && Objects.equals(keySelector, that.keySelector);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), partitioner, keySelector);
+ }
+
@Override
public boolean isPointwise() {
return false;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index d5f94d42412..dc41173dd8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
@@ -164,10 +166,11 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
- List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
- Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
- new HashMap<>(outEdgesInOrder.size());
- this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
+ List<NonChainedOutput> outputsInOrder =
+ configuration.getVertexNonChainedOutputs(userCodeClassloader);
+ Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs =
+ new HashMap<>(outputsInOrder.size());
+ this.streamOutputs = new RecordWriterOutput<?>[outputsInOrder.size()];
this.finishedOnRestoreInput =
this.isTaskDeployedAsFinished()
? new FinishedOnRestoreInput(
@@ -178,11 +181,11 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
boolean success = false;
try {
createChainOutputs(
- outEdgesInOrder,
+ outputsInOrder,
recordWriterDelegate,
chainedConfigs,
containingTask,
- streamOutputMap);
+ recordWriterOutputs);
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperatorWrapper<?, ?>> allOpWrappers =
@@ -193,7 +196,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
configuration,
chainedConfigs,
userCodeClassloader,
- streamOutputMap,
+ recordWriterOutputs,
allOpWrappers,
containingTask.getMailboxExecutorFactory());
@@ -499,40 +502,41 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
private void createChainOutputs(
- List<StreamEdge> outEdgesInOrder,
+ List<NonChainedOutput> outputsInOrder,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
Map<Integer, StreamConfig> chainedConfigs,
StreamTask<OUT, OP> containingTask,
- Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
- for (int i = 0; i < outEdgesInOrder.size(); i++) {
- StreamEdge outEdge = outEdgesInOrder.get(i);
+ Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs) {
+ for (int i = 0; i < outputsInOrder.size(); ++i) {
+ NonChainedOutput output = outputsInOrder.get(i);
- RecordWriterOutput<?> streamOutput =
+ RecordWriterOutput<?> recordWriterOutput =
createStreamOutput(
recordWriterDelegate.getRecordWriter(i),
- outEdge,
- chainedConfigs.get(outEdge.getSourceId()),
+ output,
+ chainedConfigs.get(output.getSourceNodeId()),
containingTask.getEnvironment());
- this.streamOutputs[i] = streamOutput;
- streamOutputMap.put(outEdge, streamOutput);
+ this.streamOutputs[i] = recordWriterOutput;
+ recordWriterOutputs.put(output.getDataSetId(), recordWriterOutput);
}
}
private RecordWriterOutput<OUT> createStreamOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
- StreamEdge edge,
+ NonChainedOutput streamOutput,
StreamConfig upStreamConfig,
Environment taskEnvironment) {
- OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
+ OutputTag sideOutputTag =
+ streamOutput.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer;
- if (edge.getOutputTag() != null) {
+ if (streamOutput.getOutputTag() != null) {
// side output
outSerializer =
upStreamConfig.getTypeSerializerSideOut(
- edge.getOutputTag(),
+ streamOutput.getOutputTag(),
taskEnvironment.getUserCodeClassLoader().asClassLoader());
} else {
// main output
@@ -546,7 +550,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
recordWriter,
outSerializer,
sideOutputTag,
- edge.supportsUnalignedCheckpoints()));
+ streamOutput.supportsUnalignedCheckpoints()));
}
@SuppressWarnings("rawtypes")
@@ -647,18 +651,19 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
- Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+ Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory) {
- List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
- new ArrayList<>(4);
+ List<WatermarkGaugeExposingOutput<StreamRecord<T>>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs
- for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
+ for (NonChainedOutput streamOutput :
+ operatorConfig.getOperatorNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
- RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
+ RecordWriterOutput<T> recordWriterOutput =
+ (RecordWriterOutput<T>) recordWriterOutputs.get(streamOutput.getDataSetId());
- allOutputs.add(new Tuple2<>(output, outputEdge));
+ allOutputs.add(recordWriterOutput);
}
// Create collectors for the chained outputs
@@ -672,22 +677,22 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
- streamOutputs,
+ recordWriterOutputs,
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory);
- allOutputs.add(new Tuple2<>(output, outputEdge));
+ allOutputs.add(output);
}
if (allOutputs.size() == 1) {
- return allOutputs.get(0).f0;
+ return allOutputs.get(0);
} else {
// send to N outputs. Note that this includes the special case
// of sending to zero outputs
@SuppressWarnings({"unchecked"})
Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++) {
- asArray[i] = allOutputs.get(i).f0;
+ asArray[i] = allOutputs.get(i);
}
// This is the inverse of creating the normal ChainingOutput.
@@ -710,7 +715,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
- Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+ Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
OutputTag<IN> outputTag,
MailboxExecutorFactory mailboxExecutorFactory) {
@@ -722,7 +727,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
operatorConfig,
chainedConfigs,
userCodeClassloader,
- streamOutputs,
+ recordWriterOutputs,
allOperatorWrappers,
mailboxExecutorFactory);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2e0610113b9..6e1080fba55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -78,8 +78,8 @@ import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -1593,26 +1593,26 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
StreamConfig configuration, Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
new ArrayList<>();
- List<StreamEdge> outEdgesInOrder =
- configuration.getOutEdgesInOrder(
+ List<NonChainedOutput> outputsInOrder =
+ configuration.getVertexNonChainedOutputs(
environment.getUserCodeClassLoader().asClassLoader());
- for (int i = 0; i < outEdgesInOrder.size(); i++) {
- StreamEdge edge = outEdgesInOrder.get(i);
+ int index = 0;
+ for (NonChainedOutput streamOutput : outputsInOrder) {
recordWriters.add(
createRecordWriter(
- edge,
- i,
+ streamOutput,
+ index++,
environment,
environment.getTaskInfo().getTaskNameWithSubtasks(),
- edge.getBufferTimeout()));
+ streamOutput.getBufferTimeout()));
}
return recordWriters;
}
@SuppressWarnings("unchecked")
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
- StreamEdge edge,
+ NonChainedOutput streamOutput,
int outputIndex,
Environment environment,
String taskNameWithSubtask,
@@ -1625,7 +1625,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
try {
outputPartitioner =
InstantiationUtil.clone(
- (StreamPartitioner<OUT>) edge.getPartitioner(),
+ (StreamPartitioner<OUT>) streamOutput.getPartitioner(),
environment.getUserCodeClassLoader().asClassLoader());
} catch (Exception e) {
ExceptionUtils.rethrow(e);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index b38046bd7d2..4b6dd73ca7c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -49,6 +49,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
@@ -123,6 +125,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -869,9 +872,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
for (JobVertex vertex : jobGraph.getVertices()) {
final StreamConfig streamConfig = new StreamConfig(vertex.getConfiguration());
- for (StreamEdge streamEdge :
- streamConfig.getOutEdgesInOrder(this.getClass().getClassLoader())) {
- assertThat(streamEdge.getBufferTimeout(), equalTo(-1L));
+ for (NonChainedOutput output :
+ streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader())) {
+ assertThat(output.getBufferTimeout(), equalTo(-1L));
}
}
}
@@ -1606,6 +1609,90 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
.isEqualTo(cacheTransformation.getDatasetId());
}
+ /**
+ * Tests that multiple downstream consumer vertices can reuse the same intermediate blocking
+ * dataset if they have the same parallelism and partitioner.
+ */
+ @Test
+ public void testIntermediateDataSetReuse() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setBufferTimeout(-1);
+ DataStream<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ // these two vertices can reuse the same intermediate dataset
+ source.rebalance().addSink(new DiscardingSink<>()).setParallelism(2).name("sink1");
+ source.rebalance().addSink(new DiscardingSink<>()).setParallelism(2).name("sink2");
+
+ // this can not reuse the same intermediate dataset because of different parallelism
+ source.rebalance().addSink(new DiscardingSink<>()).setParallelism(3);
+
+ // this can not reuse the same intermediate dataset because of different partitioner
+ source.broadcast().addSink(new DiscardingSink<>()).setParallelism(2);
+
+ // these two vertices can reuse the same intermediate dataset because of the pipelined edge
+ source.forward().addSink(new DiscardingSink<>()).setParallelism(1).disableChaining();
+ source.forward().addSink(new DiscardingSink<>()).setParallelism(1).disableChaining();
+
+ DataStream<Integer> mapStream = source.forward().map(value -> value).setParallelism(1);
+
+ // these two vertices can reuse the same intermediate dataset
+ mapStream.broadcast().addSink(new DiscardingSink<>()).setParallelism(2).name("sink3");
+ mapStream.broadcast().addSink(new DiscardingSink<>()).setParallelism(2).name("sink4");
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED);
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+ Assertions.assertThat(vertices.size()).isEqualTo(9);
+
+ JobVertex sourceVertex = vertices.get(0);
+ List<IntermediateDataSetID> producedDataSet =
+ sourceVertex.getProducedDataSets().stream()
+ .map(IntermediateDataSet::getId)
+ .collect(Collectors.toList());
+ Assertions.assertThat(producedDataSet.size()).isEqualTo(6);
+
+ JobVertex sinkVertex1 = checkNotNull(findJobVertexWithName(vertices, "sink1"));
+ JobVertex sinkVertex2 = checkNotNull(findJobVertexWithName(vertices, "sink2"));
+ JobVertex sinkVertex3 = checkNotNull(findJobVertexWithName(vertices, "sink3"));
+ JobVertex sinkVertex4 = checkNotNull(findJobVertexWithName(vertices, "sink4"));
+
+ Assertions.assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
+ .isEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
+ Assertions.assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
+ .isEqualTo(sinkVertex3.getInputs().get(0).getSource().getId());
+ Assertions.assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
+ .isNotEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
+
+ StreamConfig streamConfig = new StreamConfig(sourceVertex.getConfiguration());
+ List<IntermediateDataSetID> nonChainedOutputs =
+ streamConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).stream()
+ .map(NonChainedOutput::getDataSetId)
+ .collect(Collectors.toList());
+ Assertions.assertThat(nonChainedOutputs.size()).isEqualTo(5);
+ Assertions.assertThat(
+ nonChainedOutputs.contains(
+ sinkVertex3.getInputs().get(0).getSource().getId()))
+ .isFalse();
+
+ List<IntermediateDataSetID> streamOutputsInOrder =
+ streamConfig.getVertexNonChainedOutputs(getClass().getClassLoader()).stream()
+ .map(NonChainedOutput::getDataSetId)
+ .collect(Collectors.toList());
+ Assertions.assertThat(streamOutputsInOrder.size()).isEqualTo(6);
+ Assertions.assertThat(streamOutputsInOrder.toArray()).isEqualTo(producedDataSet.toArray());
+ }
+
+ private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
+ for (JobVertex jobVertex : vertices) {
+ if (jobVertex.getName().contains(name)) {
+ return jobVertex;
+ }
+ }
+ return null;
+ }
+
private JobGraph createJobGraphWithDescription(
StreamExecutionEnvironment env, String... inputNames) {
env.setParallelism(1);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
index 9e0fd6cf19b..c33adbd92a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
@@ -19,24 +19,22 @@ package org.apache.flink.streaming.runtime.partitioner;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.util.List;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
/** Test for {@link ForwardForConsecutiveHashPartitioner}. */
-public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
+class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
@Test
- public void testConvertToForwardPartitioner() {
+ void testConvertToForwardPartitioner() {
testConvertToForwardPartitioner(StreamExchangeMode.BATCH);
testConvertToForwardPartitioner(StreamExchangeMode.PIPELINED);
testConvertToForwardPartitioner(StreamExchangeMode.UNDEFINED);
@@ -50,16 +48,16 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
new ForwardForConsecutiveHashPartitioner<>(
new KeyGroupStreamPartitioner<>(record -> 0L, 100)));
List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- assertThat(jobVertices.size(), is(1));
+ Assertions.assertThat(jobVertices.size()).isEqualTo(1);
JobVertex vertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
StreamConfig sourceConfig = new StreamConfig(vertex.getConfiguration());
StreamEdge edge = sourceConfig.getChainedOutputs(getClass().getClassLoader()).get(0);
- assertThat(edge.getPartitioner(), instanceOf(ForwardPartitioner.class));
+ Assertions.assertThat(edge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
}
@Test
- public void testConvertToHashPartitioner() {
+ void testConvertToHashPartitioner() {
testConvertToHashPartitioner(StreamExchangeMode.BATCH);
testConvertToHashPartitioner(StreamExchangeMode.PIPELINED);
testConvertToHashPartitioner(StreamExchangeMode.UNDEFINED);
@@ -73,11 +71,13 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
new ForwardForConsecutiveHashPartitioner<>(
new KeyGroupStreamPartitioner<>(record -> 0L, 100)));
List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- assertThat(jobVertices.size(), is(2));
+ Assertions.assertThat(jobVertices.size()).isEqualTo(2);
JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
- StreamEdge edge = sourceConfig.getNonChainedOutputs(getClass().getClassLoader()).get(0);
- assertThat(edge.getPartitioner(), instanceOf(KeyGroupStreamPartitioner.class));
+ NonChainedOutput output =
+ sourceConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).get(0);
+ Assertions.assertThat(output.getPartitioner())
+ .isInstanceOf(KeyGroupStreamPartitioner.class);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java
index c5b846cea1a..1f736ad19de 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForUnspecifiedPartitionerTest.java
@@ -19,46 +19,45 @@ package org.apache.flink.streaming.runtime.partitioner;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.util.List;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
/** Test for {@link ForwardForUnspecifiedPartitioner}. */
-public class ForwardForUnspecifiedPartitionerTest extends TestLogger {
+class ForwardForUnspecifiedPartitionerTest extends TestLogger {
@Test
- public void testConvertToForwardPartitioner() {
+ void testConvertToForwardPartitioner() {
JobGraph jobGraph =
StreamPartitionerTestUtils.createJobGraph(
"group1", "group1", new ForwardForUnspecifiedPartitioner<>());
List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- assertThat(jobVertices.size(), is(1));
+ Assertions.assertThat(jobVertices.size()).isEqualTo(1);
JobVertex vertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
StreamConfig sourceConfig = new StreamConfig(vertex.getConfiguration());
StreamEdge edge = sourceConfig.getChainedOutputs(getClass().getClassLoader()).get(0);
- assertThat(edge.getPartitioner(), instanceOf(ForwardPartitioner.class));
+ Assertions.assertThat(edge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
}
@Test
- public void testConvertToRescalePartitioner() {
+ void testConvertToRescalePartitioner() {
JobGraph jobGraph =
StreamPartitionerTestUtils.createJobGraph(
"group1", "group2", new ForwardForUnspecifiedPartitioner<>());
List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- assertThat(jobVertices.size(), is(2));
+ Assertions.assertThat(jobVertices.size()).isEqualTo(2);
JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
- StreamEdge edge = sourceConfig.getNonChainedOutputs(getClass().getClassLoader()).get(0);
- assertThat(edge.getPartitioner(), instanceOf(RescalePartitioner.class));
+ NonChainedOutput output =
+ sourceConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).get(0);
+ Assertions.assertThat(output.getPartitioner()).isInstanceOf(RescalePartitioner.class);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 5efb7914744..37520e0515c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -21,7 +21,10 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
@@ -166,36 +169,34 @@ public class StreamConfigChainer<OWNER> {
public OWNER finish() {
checkState(chainIndex > 0, "Use finishForSingletonOperatorChain");
- List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+ List<NonChainedOutput> outEdgesInOrder = new LinkedList<>();
StreamNode sourceVertex =
new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null);
for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
- StreamEdge streamEdge =
- new StreamEdge(
- sourceVertex,
- new StreamNode(
- chainIndex + i,
- null,
- null,
- (StreamOperator<?>) null,
- null,
- null),
- 0,
+ NonChainedOutput streamOutput =
+ new NonChainedOutput(
+ true,
+ sourceVertex.getId(),
+ 1,
+ 1,
+ 100,
+ false,
+ new IntermediateDataSetID(),
+ null,
new BroadcastPartitioner<>(),
- null);
- streamEdge.setBufferTimeout(1);
- outEdgesInOrder.add(streamEdge);
+ ResultPartitionType.PIPELINED_BOUNDED);
+ outEdgesInOrder.add(streamOutput);
}
tailConfig.setChainEnd();
tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs);
- tailConfig.setOutEdgesInOrder(outEdgesInOrder);
- tailConfig.setNonChainedOutputs(outEdgesInOrder);
+ tailConfig.setVertexNonChainedOutputs(outEdgesInOrder);
+ tailConfig.setOperatorNonChainedOutputs(outEdgesInOrder);
chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
- headConfig.setOutEdgesInOrder(outEdgesInOrder);
+ headConfig.setVertexNonChainedOutputs(outEdgesInOrder);
headConfig.serializeAllConfigs();
return owner;
@@ -209,7 +210,7 @@ public class StreamConfigChainer<OWNER> {
new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
- List<StreamEdge> outEdgesInOrder = new LinkedList<>();
+ List<NonChainedOutput> streamOutputs = new LinkedList<>();
StreamNode sourceVertexDummy =
new StreamNode(
@@ -220,31 +221,27 @@ public class StreamConfigChainer<OWNER> {
"source dummy",
SourceStreamTask.class);
for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
- StreamNode targetVertexDummy =
- new StreamNode(
- MAIN_NODE_ID + 1 + i,
- "group",
+ streamOutputs.add(
+ new NonChainedOutput(
+ true,
+ sourceVertexDummy.getId(),
+ 1,
+ 1,
+ 100,
+ false,
+ new IntermediateDataSetID(),
null,
- dummyOperator,
- "target dummy",
- SourceStreamTask.class);
-
- outEdgesInOrder.add(
- new StreamEdge(
- sourceVertexDummy,
- targetVertexDummy,
- 0,
new BroadcastPartitioner<>(),
- null));
+ ResultPartitionType.PIPELINED_BOUNDED));
}
headConfig.setVertexID(0);
headConfig.setNumberOfOutputs(1);
- headConfig.setOutEdgesInOrder(outEdgesInOrder);
- headConfig.setNonChainedOutputs(outEdgesInOrder);
+ headConfig.setVertexNonChainedOutputs(streamOutputs);
+ headConfig.setOperatorNonChainedOutputs(streamOutputs);
chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
- headConfig.setOutEdgesInOrder(outEdgesInOrder);
+ headConfig.setVertexNonChainedOutputs(streamOutputs);
headConfig.setTypeSerializerOut(outputSerializer);
headConfig.serializeAllConfigs();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 299db5d10a3..7fb875ada35 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -27,8 +27,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -43,6 +45,7 @@ import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.NetworkInputConfig;
@@ -340,7 +343,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
? StreamConfigChainer.MAIN_NODE_ID
: Collections.max(transitiveChainedTaskConfigs.keySet());
- List<StreamEdge> outEdgesInOrder = new LinkedList<>();
+ List<StreamEdge> chainedOutputs = new LinkedList<>();
StreamEdge sourceToMainEdge =
new StreamEdge(
@@ -355,12 +358,26 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
0,
new ForwardPartitioner<>(),
null);
- outEdgesInOrder.add(sourceToMainEdge);
+ chainedOutputs.add(sourceToMainEdge);
+
+ List<NonChainedOutput> streamOutputsInOrder = new LinkedList<>();
+ streamOutputsInOrder.add(
+ new NonChainedOutput(
+ true,
+ sourceToMainEdge.getSourceId(),
+ 1,
+ 1,
+ 100,
+ false,
+ new IntermediateDataSetID(),
+ null,
+ new ForwardPartitioner<>(),
+ ResultPartitionType.PIPELINED_BOUNDED));
StreamConfig sourceConfig = new StreamConfig(new Configuration());
sourceConfig.setTimeCharacteristic(streamConfig.getTimeCharacteristic());
- sourceConfig.setOutEdgesInOrder(outEdgesInOrder);
- sourceConfig.setChainedOutputs(outEdgesInOrder);
+ sourceConfig.setVertexNonChainedOutputs(streamOutputsInOrder);
+ sourceConfig.setChainedOutputs(chainedOutputs);
sourceConfig.setTypeSerializerOut(sourceInput.getSourceSerializer());
sourceConfig.setOperatorID(sourceInput.getOperatorId());
sourceConfig.setStreamOperatorFactory(sourceInput.getSourceOperatorFactory());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 5a9930ef709..805b267726b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -34,7 +34,9 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
@@ -50,8 +52,8 @@ import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -237,24 +239,26 @@ public class StreamTaskTestHarness<OUT> {
private static final long serialVersionUID = 1L;
};
- List<StreamEdge> outEdgesInOrder = new LinkedList<>();
+ List<NonChainedOutput> streamOutputs = new LinkedList<>();
StreamNode sourceVertexDummy =
new StreamNode(
0, "group", null, dummyOperator, "source dummy", SourceStreamTask.class);
- StreamNode targetVertexDummy =
- new StreamNode(
- 1, "group", null, dummyOperator, "target dummy", SourceStreamTask.class);
- outEdgesInOrder.add(
- new StreamEdge(
- sourceVertexDummy,
- targetVertexDummy,
- 0,
+ streamOutputs.add(
+ new NonChainedOutput(
+ true,
+ sourceVertexDummy.getId(),
+ 1,
+ 1,
+ 100,
+ false,
+ new IntermediateDataSetID(),
+ null,
new BroadcastPartitioner<>(),
- null /* output tag */));
+ ResultPartitionType.PIPELINED_BOUNDED));
- streamConfig.setOutEdgesInOrder(outEdgesInOrder);
- streamConfig.setNonChainedOutputs(outEdgesInOrder);
+ streamConfig.setVertexNonChainedOutputs(streamOutputs);
+ streamConfig.setOperatorNonChainedOutputs(streamOutputs);
streamConfig.serializeAllConfigs();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
index 11453a61106..3080e55f512 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -19,9 +19,11 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -51,21 +53,24 @@ public class MockStreamConfig extends StreamConfig {
StreamNode sourceVertex =
new StreamNode(0, null, null, dummyOperator, "source", SourceStreamTask.class);
- StreamNode targetVertex =
- new StreamNode(1, null, null, dummyOperator, "target", SourceStreamTask.class);
- List<StreamEdge> outEdgesInOrder = new ArrayList<>(numberOfOutputs);
+ List<NonChainedOutput> streamOutputs = new ArrayList<>(numberOfOutputs);
for (int i = 0; i < numberOfOutputs; i++) {
- outEdgesInOrder.add(
- new StreamEdge(
- sourceVertex,
- targetVertex,
- numberOfOutputs,
+ streamOutputs.add(
+ new NonChainedOutput(
+ true,
+ sourceVertex.getId(),
+ 1,
+ 1,
+ 100,
+ false,
+ new IntermediateDataSetID(),
+ null,
new BroadcastPartitioner<>(),
- null));
+ ResultPartitionType.PIPELINED_BOUNDED));
}
- setOutEdgesInOrder(outEdgesInOrder);
- setNonChainedOutputs(outEdgesInOrder);
+ setVertexNonChainedOutputs(streamOutputs);
+ setOperatorNonChainedOutputs(streamOutputs);
serializeAllConfigs();
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java
index e04ce75ee9c..faa18cafebf 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java
@@ -76,6 +76,26 @@ public class BinaryHashPartitioner extends StreamPartitioner<RowData> {
return false;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final BinaryHashPartitioner that = (BinaryHashPartitioner) o;
+ return numberOfChannels == that.numberOfChannels
+ && Arrays.equals(hashFieldNames, that.hashFieldNames);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + Arrays.hashCode(hashFieldNames);
+ return result;
+ }
+
@Override
public String toString() {
return "HASH" + Arrays.toString(hashFieldNames);