You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/22 06:16:59 UTC

[flink] branch release-1.11 updated (3eb1075 -> f3268aa)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3eb1075  [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
     new d8693f6  [FLINK-17805][hotfix][network] Fix/update/rename InputProcessorUtil.createCheckpointedInputGatePair method
     new f3268aa  [FLINK-17805][network] Fix ArrayIndexOutOfBound for rotated input gate indexes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/CheckpointedInputGate.java          |  5 ++
 .../streaming/runtime/io/InputProcessorUtil.java   | 20 ++++++-
 .../runtime/tasks/MultipleInputStreamTask.java     |  2 +-
 .../runtime/tasks/TwoInputStreamTask.java          |  2 +-
 .../runtime/io/InputProcessorUtilTest.java         | 61 ++++++++++++++++++++++
 .../streaming/runtime/io/MockIndexedInputGate.java | 12 ++++-
 6 files changed, 96 insertions(+), 6 deletions(-)


[flink] 01/02: [FLINK-17805][hotfix][network] Fix/update/rename InputProcessorUtil.createCheckpointedInputGatePair method

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8693f6000d925cc3485bf2bf0537defddb68986
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue May 19 09:24:05 2020 +0200

    [FLINK-17805][hotfix][network] Fix/update/rename InputProcessorUtil.createCheckpointedInputGatePair method
---
 .../java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java  | 2 +-
 .../apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java   | 2 +-
 .../org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java    | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 18462b0..f1bf043 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -66,7 +66,7 @@ public class InputProcessorUtil {
 	 * @return a pair of {@link CheckpointedInputGate} created for two corresponding
 	 * {@link InputGate}s supplied as parameters.
 	 */
-	public static CheckpointedInputGate[] createCheckpointedInputGatePair(
+	public static CheckpointedInputGate[] createCheckpointedMultipleInputGate(
 			AbstractInvokable toNotifyOnCheckpoint,
 			StreamConfig config,
 			ChannelStateWriter channelStateWriter,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index f6c4bb9..15f620b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -91,7 +91,7 @@ public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputS
 			headOperator instanceof InputSelectable ? (InputSelectable) headOperator : null,
 			inputGates.length);
 
-		CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair(
+		CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedMultipleInputGate(
 			this,
 			getConfiguration(),
 			getChannelStateWriter(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 18155d0..c39d9e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -54,7 +54,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
 			headOperator instanceof InputSelectable ? (InputSelectable) headOperator : null);
 
 		// create an input instance for each input
-		CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair(
+		CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedMultipleInputGate(
 			this,
 			getConfiguration(),
 			getChannelStateWriter(),


[flink] 02/02: [FLINK-17805][network] Fix ArrayIndexOutOfBound for rotated input gate indexes

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3268aa8fab4647667f4cf726738ac5d1f7d1af7
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat May 16 12:45:42 2020 +0200

    [FLINK-17805][network] Fix ArrayIndexOutOfBound for rotated input gate indexes
    
    It's possible that indexes of passed InputGates are not monotonic - that left
    input has higher input gate index. This commit fixes an ArrayIndexOutOfBound caused
    by this.
---
 .../runtime/io/CheckpointedInputGate.java          |  5 ++
 .../streaming/runtime/io/InputProcessorUtil.java   | 18 ++++++-
 .../runtime/io/InputProcessorUtilTest.java         | 61 ++++++++++++++++++++++
 .../streaming/runtime/io/MockIndexedInputGate.java | 12 ++++-
 4 files changed, 93 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 4aeffc2..578cf21 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -228,4 +228,9 @@ public class CheckpointedInputGate implements PullingAsyncDataInput<BufferOrEven
 	public InputChannel getChannel(int channelIndex) {
 		return inputGate.getChannel(channelIndex);
 	}
+
+	@VisibleForTesting
+	CheckpointBarrierHandler getCheckpointBarrierHandler() {
+		return barrierHandler;
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index f1bf043..5dbfc02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.IntStream;
@@ -79,11 +80,26 @@ public class InputProcessorUtil {
 			unionedInputGates[i] = InputGateUtil.createInputGate(inputGates[i].toArray(new IndexedInputGate[0]));
 		}
 
+		IntStream numberOfInputChannelsPerGate =
+			Arrays
+				.stream(inputGates)
+				.flatMap(collection -> collection.stream())
+				.sorted(Comparator.comparingInt(IndexedInputGate::getGateIndex))
+				.mapToInt(InputGate::getNumberOfInputChannels);
+
 		Map<InputGate, Integer> inputGateToChannelIndexOffset = generateInputGateToChannelIndexOffsetMap(unionedInputGates);
+		// Note that numberOfInputChannelsPerGate and inputGateToChannelIndexOffset have a bit different
+		// indexing and purposes.
+		//
+		// The numberOfInputChannelsPerGate is indexed based on flattened input gates, and sorted based on GateIndex,
+		// so that it can be used in combination with InputChannelInfo class.
+		//
+		// The inputGateToChannelIndexOffset is based upon unioned input gates and it's use for translating channel
+		// indexes from perspective of UnionInputGate to perspective of SingleInputGate.
 
 		CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
 			config,
-			Arrays.stream(inputGates).flatMapToInt(collection -> collection.stream().mapToInt(InputGate::getNumberOfInputChannels)),
+			numberOfInputChannelsPerGate,
 			channelStateWriter,
 			taskName,
 			generateChannelIndexToInputGateMap(unionedInputGates),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
index d7e0b3a..8a511ef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
@@ -18,15 +18,34 @@
 
 package org.apache.flink.streaming.runtime.io;
 
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.MockChannelStateWriter;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.util.MockStreamTask;
+import org.apache.flink.streaming.util.MockStreamTaskBuilder;
 
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the behaviors of the {@link InputProcessorUtil}.
@@ -58,4 +77,46 @@ public class InputProcessorUtilTest {
 		assertEquals(0, inputGateToChannelIndexOffsetMap.get(ig1).intValue());
 		assertEquals(3, inputGateToChannelIndexOffsetMap.get(ig2).intValue());
 	}
+
+	@Test
+	public void testCreateCheckpointedMultipleInputGate() throws Exception {
+		try (CloseableRegistry registry = new CloseableRegistry()) {
+			MockEnvironment environment = new MockEnvironmentBuilder().build();
+			MockStreamTask streamTask = new MockStreamTaskBuilder(environment).build();
+			StreamConfig streamConfig = new StreamConfig(environment.getJobConfiguration());
+			streamConfig.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
+			streamConfig.setUnalignedCheckpointsEnabled(true);
+
+			// First input gate has index larger than the second
+			Collection<IndexedInputGate>[] inputGates = new Collection[] {
+				Collections.singletonList(new MockIndexedInputGate(1, 4)),
+				Collections.singletonList(new MockIndexedInputGate(0, 2)),
+			};
+
+			CheckpointedInputGate[] checkpointedMultipleInputGate = InputProcessorUtil.createCheckpointedMultipleInputGate(
+				streamTask,
+				streamConfig,
+				new MockChannelStateWriter(),
+				environment.getMetricGroup().getIOMetricGroup(),
+				streamTask.getName(),
+				inputGates);
+			for (CheckpointedInputGate checkpointedInputGate : checkpointedMultipleInputGate) {
+				registry.registerCloseable(checkpointedInputGate);
+			}
+
+			CheckpointBarrierHandler barrierHandler = checkpointedMultipleInputGate[0].getCheckpointBarrierHandler();
+			assertTrue(barrierHandler.getBufferReceivedListener().isPresent());
+			BufferReceivedListener bufferReceivedListener = barrierHandler.getBufferReceivedListener().get();
+
+			List<IndexedInputGate> allInputGates = Arrays.stream(inputGates).flatMap(gates -> gates.stream()).collect(Collectors.toList());
+			for (IndexedInputGate inputGate : allInputGates) {
+				for (int channelId = 0; channelId < inputGate.getNumberOfInputChannels(); channelId++) {
+					bufferReceivedListener.notifyBarrierReceived(
+						new CheckpointBarrier(1, 42, CheckpointOptions.forCheckpointWithDefaultLocation(true, true)),
+						new InputChannelInfo(inputGate.getGateIndex(), channelId));
+				}
+			}
+			assertTrue(barrierHandler.getAllBarriersReceivedFuture(1).isDone());
+		}
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index 0a9992e..a04ec55 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -33,8 +33,16 @@ import java.util.concurrent.ExecutorService;
  * Mock {@link IndexedInputGate}.
  */
 public class MockIndexedInputGate extends IndexedInputGate {
+	private final int gateIndex;
+	private final int numberOfInputChannels;
 
 	public MockIndexedInputGate() {
+		this(0, 1);
+	}
+
+	public MockIndexedInputGate(int gateIndex, int numberOfInputChannels) {
+		this.gateIndex = gateIndex;
+		this.numberOfInputChannels = numberOfInputChannels;
 	}
 
 	@Override
@@ -56,7 +64,7 @@ public class MockIndexedInputGate extends IndexedInputGate {
 
 	@Override
 	public int getNumberOfInputChannels() {
-		return 1;
+		return numberOfInputChannels;
 	}
 
 	@Override
@@ -93,6 +101,6 @@ public class MockIndexedInputGate extends IndexedInputGate {
 
 	@Override
 	public int getGateIndex() {
-		return 0;
+		return gateIndex;
 	}
 }