You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/04 06:57:24 UTC
[flink] branch release-1.11 updated: [FLINK-14087][datastream]
Clone the StreamPartitioner to avoid being shared at runtime.
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 7ecffb4 [FLINK-14087][datastream] Clone the StreamPartitioner to avoid being shared at runtime.
7ecffb4 is described below
commit 7ecffb4e31505b1f33289c278dc7f397c1f229d6
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Thu Sep 3 22:30:39 2020 +0800
[FLINK-14087][datastream] Clone the StreamPartitioner to avoid being shared at runtime.
---
.../flink/streaming/runtime/tasks/StreamTask.java | 16 ++-
.../DataStreamWithSharedPartitionNodeITCase.java | 109 +++++++++++++++++++++
2 files changed, 123 insertions(+), 2 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fa6001c..5759cc3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -74,6 +74,7 @@ import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.RunnableWithException;
@@ -1160,14 +1161,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return recordWriters;
}
+ @SuppressWarnings("unchecked")
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
- @SuppressWarnings("unchecked")
- StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
+
+ StreamPartitioner<OUT> outputPartitioner = null;
+
+ // Clones the partition to avoid multiple stream edges sharing the same stream partitioner,
+ // like the case of https://issues.apache.org/jira/browse/FLINK-14087.
+ try {
+ outputPartitioner = InstantiationUtil.clone(
+ (StreamPartitioner<OUT>) edge.getPartitioner(),
+ environment.getUserClassLoader());
+ } catch (Exception e) {
+ ExceptionUtils.rethrow(e);
+ }
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/DataStreamWithSharedPartitionNodeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/DataStreamWithSharedPartitionNodeITCase.java
new file mode 100644
index 0000000..f1bffac
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/DataStreamWithSharedPartitionNodeITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test verifies the data could be partitioned correctly
+ * if multiple consumers are connected to the same partitioner
+ * node.
+ */
+public class DataStreamWithSharedPartitionNodeITCase {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(3)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testJobWithSharePartitionNode() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ DataStream<Integer> source = env
+ .fromElements(1, 2, 3, 4)
+ .partitionCustom(new TestPartitioner(), f -> f);
+ source.addSink(new CollectSink("first"));
+ source.addSink(new CollectSink("second")).setParallelism(2);
+
+ env.execute();
+
+ checkSinkResult("first-0", Arrays.asList(1, 2, 3, 4));
+ checkSinkResult("second-0", Arrays.asList(1, 3));
+ checkSinkResult("second-1", Arrays.asList(2, 4));
+ }
+
+ private void checkSinkResult(String nameAndIndex, List<Integer> expected) {
+ List<Integer> actualResult = CollectSink.result.get(nameAndIndex);
+ assertEquals(expected, actualResult);
+ }
+
+ private static class TestPartitioner implements Partitioner<Integer> {
+ private int nextChannelToSendTo = -1;
+
+ @Override
+ public int partition(Integer key, int numPartitions) {
+ nextChannelToSendTo = (nextChannelToSendTo + 1) % numPartitions;
+ return nextChannelToSendTo;
+ }
+ }
+
+ private static class CollectSink extends RichSinkFunction<Integer> {
+ private static final Object resultLock = new Object();
+
+ @GuardedBy("resultLock")
+ private static final Map<String, List<Integer>> result = new HashMap<>();
+
+ private final String name;
+
+ public CollectSink(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void invoke(Integer value, Context context) throws Exception {
+ synchronized (resultLock) {
+ String key = name + "-" + getRuntimeContext().getIndexOfThisSubtask();
+ result.compute(key, (k, v) -> v == null ? new ArrayList<>() : v)
+ .add(value);
+ }
+ }
+ }
+}