You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/18 08:39:38 UTC

[flink] branch release-1.9 updated: [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 3eff638  [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor
3eff638 is described below

commit 3eff6387b5f6716dee5c17b71b10c08760b946cc
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Thu Jul 18 10:37:22 2019 +0200

    [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor
    
    Fixes the problem in FLINK-13249 by ensuring that processing the partition producer response is not blocking any netty thread, but is always executed by the task's executor.
    
    (cherry picked from commit 23bd23b325f15c726fcc48c76eb252fa2ed2fe59)
---
 .../partition/PartitionProducerStateProvider.java        |  9 +++++----
 .../io/network/partition/consumer/SingleInputGate.java   |  6 +++---
 .../java/org/apache/flink/runtime/taskmanager/Task.java  | 16 ++++++++++------
 .../partition/consumer/SingleInputGateBuilder.java       |  6 +-----
 .../org/apache/flink/runtime/taskmanager/TaskTest.java   |  8 ++++----
 5 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
index 8bbdaa5..5785095 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.types.Either;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * Request execution state of partition producer, the response accepts state check callbacks.
@@ -34,11 +34,12 @@ public interface PartitionProducerStateProvider {
 	 * @param intermediateDataSetId ID of the parent intermediate data set.
 	 * @param resultPartitionId ID of the result partition to check. This
 	 * identifies the producing execution and partition.
-	 * @return a future with response handle.
+	 * @param responseConsumer consumer for the response handle.
 	 */
-	CompletableFuture<? extends ResponseHandle> requestPartitionProducerState(
+	void requestPartitionProducerState(
 		IntermediateDataSetID intermediateDataSetId,
-		ResultPartitionID resultPartitionId);
+		ResultPartitionID resultPartitionId,
+		Consumer<? super ResponseHandle> responseConsumer);
 
 	/**
 	 * Result of state query, accepts state check callbacks.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index bd75262..534078d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -601,8 +601,8 @@ public class SingleInputGate extends InputGate {
 	void triggerPartitionStateCheck(ResultPartitionID partitionId) {
 		partitionProducerStateProvider.requestPartitionProducerState(
 			consumedResultId,
-			partitionId)
-			.thenAccept(responseHandle -> {
+			partitionId,
+			((PartitionProducerStateProvider.ResponseHandle responseHandle) -> {
 				boolean isProducingState = new RemoteChannelStateChecker(partitionId, owningTaskName)
 					.isProducerReadyOrAbortConsumption(responseHandle);
 				if (isProducingState) {
@@ -612,7 +612,7 @@ public class SingleInputGate extends InputGate {
 						responseHandle.failConsumption(t);
 					}
 				}
-			});
+			}));
 	}
 
 	private void queueChannel(InputChannel channel) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d4e1d8a..12049f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -99,6 +99,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -1080,18 +1081,21 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 	// ------------------------------------------------------------------------
 
 	@Override
-	public CompletableFuture<PartitionProducerStateResponseHandle> requestPartitionProducerState(
+	public void requestPartitionProducerState(
 			final IntermediateDataSetID intermediateDataSetId,
-			final ResultPartitionID resultPartitionId) {
+			final ResultPartitionID resultPartitionId,
+			Consumer<? super ResponseHandle> responseConsumer) {
+
 		final CompletableFuture<ExecutionState> futurePartitionState =
 			partitionProducerStateChecker.requestPartitionProducerState(
 				jobId,
 				intermediateDataSetId,
 				resultPartitionId);
-		final CompletableFuture<PartitionProducerStateResponseHandle> result =
-			futurePartitionState.handleAsync(PartitionProducerStateResponseHandle::new, executor);
-		FutureUtils.assertNoException(result);
-		return result;
+
+		FutureUtils.assertNoException(
+			futurePartitionState
+				.handle(PartitionProducerStateResponseHandle::new)
+				.thenAcceptAsync(responseConsumer, executor));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 956bad9..944cc07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -21,23 +21,19 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
-import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Utility class to encapsulate the logic of building a {@link SingleInputGate} instance.
  */
 public class SingleInputGateBuilder {
 
-	private static final CompletableFuture<ResponseHandle> NO_OP_PRODUCER_CHECKER_RESULT = new CompletableFuture<>();
-
-	public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id) -> NO_OP_PRODUCER_CHECKER_RESULT;
+	public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id, consumer) -> {};
 
 	private final IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ee78963..5879f52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -656,7 +656,7 @@ public class TaskTest extends TestLogger {
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
 
-			task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult ->
+			task.requestPartitionProducerState(resultId, partitionId, checkResult ->
 				assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false))
 			);
 
@@ -680,7 +680,7 @@ public class TaskTest extends TestLogger {
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
 
-			task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult ->
+			task.requestPartitionProducerState(resultId, partitionId, checkResult ->
 				assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false))
 			);
 
@@ -711,7 +711,7 @@ public class TaskTest extends TestLogger {
 				CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 				when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
 
-				task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> {
+				task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
 					if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
 						callCount.incrementAndGet();
 					}
@@ -749,7 +749,7 @@ public class TaskTest extends TestLogger {
 				CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 				when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
 
-				task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> {
+				task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
 					if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
 						callCount.incrementAndGet();
 					}