You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/04 06:57:24 UTC

[flink] branch release-1.11 updated: [FLINK-14087][datastream] Clone the StreamPartitioner to avoid being shared at runtime.

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 7ecffb4  [FLINK-14087][datastream] Clone the StreamPartitioner to avoid being shared at runtime.
7ecffb4 is described below

commit 7ecffb4e31505b1f33289c278dc7f397c1f229d6
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Thu Sep 3 22:30:39 2020 +0800

    [FLINK-14087][datastream] Clone the StreamPartitioner to avoid being shared at runtime.
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  16 ++-
 .../DataStreamWithSharedPartitionNodeITCase.java   | 109 +++++++++++++++++++++
 2 files changed, 123 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fa6001c..5759cc3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -74,6 +74,7 @@ import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.RunnableWithException;
@@ -1160,14 +1161,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return recordWriters;
 	}
 
+	@SuppressWarnings("unchecked")
 	private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
 			StreamEdge edge,
 			int outputIndex,
 			Environment environment,
 			String taskName,
 			long bufferTimeout) {
-		@SuppressWarnings("unchecked")
-		StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
+
+		StreamPartitioner<OUT> outputPartitioner = null;
+
+		// Clones the partition to avoid multiple stream edges sharing the same stream partitioner,
+		// like the case of https://issues.apache.org/jira/browse/FLINK-14087.
+		try {
+			outputPartitioner = InstantiationUtil.clone(
+				(StreamPartitioner<OUT>) edge.getPartitioner(),
+				environment.getUserClassLoader());
+		} catch (Exception e) {
+			ExceptionUtils.rethrow(e);
+		}
 
 		LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/DataStreamWithSharedPartitionNodeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/DataStreamWithSharedPartitionNodeITCase.java
new file mode 100644
index 0000000..f1bffac
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/DataStreamWithSharedPartitionNodeITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test verifies the data could be partitioned correctly
+ * if multiple consumers are connected to the same partitioner
+ * node.
+ */
+public class DataStreamWithSharedPartitionNodeITCase {
+
+	@ClassRule
+	public static MiniClusterWithClientResource flinkCluster =
+		new MiniClusterWithClientResource(
+			new MiniClusterResourceConfiguration.Builder()
+				.setNumberSlotsPerTaskManager(3)
+				.setNumberTaskManagers(1)
+				.build());
+
+	@Test
+	public void testJobWithSharePartitionNode() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		DataStream<Integer> source = env
+			.fromElements(1, 2, 3, 4)
+			.partitionCustom(new TestPartitioner(), f -> f);
+		source.addSink(new CollectSink("first"));
+		source.addSink(new CollectSink("second")).setParallelism(2);
+
+		env.execute();
+
+		checkSinkResult("first-0", Arrays.asList(1, 2, 3, 4));
+		checkSinkResult("second-0", Arrays.asList(1, 3));
+		checkSinkResult("second-1", Arrays.asList(2, 4));
+	}
+
+	private void checkSinkResult(String nameAndIndex, List<Integer> expected) {
+		List<Integer> actualResult = CollectSink.result.get(nameAndIndex);
+		assertEquals(expected, actualResult);
+	}
+
+	private static class TestPartitioner implements Partitioner<Integer> {
+		private int nextChannelToSendTo = -1;
+
+		@Override
+		public int partition(Integer key, int numPartitions) {
+			nextChannelToSendTo = (nextChannelToSendTo + 1) % numPartitions;
+			return nextChannelToSendTo;
+		}
+	}
+
+	private static class CollectSink extends RichSinkFunction<Integer> {
+		private static final Object resultLock = new Object();
+
+		@GuardedBy("resultLock")
+		private static final Map<String, List<Integer>> result = new HashMap<>();
+
+		private final String name;
+
+		public CollectSink(String name) {
+			this.name = name;
+		}
+
+		@Override
+		public void invoke(Integer value, Context context) throws Exception {
+			synchronized (resultLock) {
+				String key = name + "-" + getRuntimeContext().getIndexOfThisSubtask();
+				result.compute(key, (k, v) -> v == null ? new ArrayList<>() : v)
+					.add(value);
+			}
+		}
+	}
+}