You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/18 08:39:38 UTC
[flink] branch release-1.9 updated: [FLINK-13249][runtime] Fix
handling of partition producer responses by running them with the task's
executor
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 3eff638 [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor
3eff638 is described below
commit 3eff6387b5f6716dee5c17b71b10c08760b946cc
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Thu Jul 18 10:37:22 2019 +0200
[FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor
Fixes the problem in FLINK-13249 by ensuring that processing the partition producer response is not blocking any netty thread, but is always executed by the task's executor.
(cherry picked from commit 23bd23b325f15c726fcc48c76eb252fa2ed2fe59)
---
.../partition/PartitionProducerStateProvider.java | 9 +++++----
.../io/network/partition/consumer/SingleInputGate.java | 6 +++---
.../java/org/apache/flink/runtime/taskmanager/Task.java | 16 ++++++++++------
.../partition/consumer/SingleInputGateBuilder.java | 6 +-----
.../org/apache/flink/runtime/taskmanager/TaskTest.java | 8 ++++----
5 files changed, 23 insertions(+), 22 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
index 8bbdaa5..5785095 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.types.Either;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
/**
* Request execution state of partition producer, the response accepts state check callbacks.
@@ -34,11 +34,12 @@ public interface PartitionProducerStateProvider {
* @param intermediateDataSetId ID of the parent intermediate data set.
* @param resultPartitionId ID of the result partition to check. This
* identifies the producing execution and partition.
- * @return a future with response handle.
+ * @param responseConsumer consumer for the response handle.
*/
- CompletableFuture<? extends ResponseHandle> requestPartitionProducerState(
+ void requestPartitionProducerState(
IntermediateDataSetID intermediateDataSetId,
- ResultPartitionID resultPartitionId);
+ ResultPartitionID resultPartitionId,
+ Consumer<? super ResponseHandle> responseConsumer);
/**
* Result of state query, accepts state check callbacks.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index bd75262..534078d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -601,8 +601,8 @@ public class SingleInputGate extends InputGate {
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
partitionProducerStateProvider.requestPartitionProducerState(
consumedResultId,
- partitionId)
- .thenAccept(responseHandle -> {
+ partitionId,
+ ((PartitionProducerStateProvider.ResponseHandle responseHandle) -> {
boolean isProducingState = new RemoteChannelStateChecker(partitionId, owningTaskName)
.isProducerReadyOrAbortConsumption(responseHandle);
if (isProducingState) {
@@ -612,7 +612,7 @@ public class SingleInputGate extends InputGate {
responseHandle.failConsumption(t);
}
}
- });
+ }));
}
private void queueChannel(InputChannel channel) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d4e1d8a..12049f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -99,6 +99,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -1080,18 +1081,21 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
// ------------------------------------------------------------------------
@Override
- public CompletableFuture<PartitionProducerStateResponseHandle> requestPartitionProducerState(
+ public void requestPartitionProducerState(
final IntermediateDataSetID intermediateDataSetId,
- final ResultPartitionID resultPartitionId) {
+ final ResultPartitionID resultPartitionId,
+ Consumer<? super ResponseHandle> responseConsumer) {
+
final CompletableFuture<ExecutionState> futurePartitionState =
partitionProducerStateChecker.requestPartitionProducerState(
jobId,
intermediateDataSetId,
resultPartitionId);
- final CompletableFuture<PartitionProducerStateResponseHandle> result =
- futurePartitionState.handleAsync(PartitionProducerStateResponseHandle::new, executor);
- FutureUtils.assertNoException(result);
- return result;
+
+ FutureUtils.assertNoException(
+ futurePartitionState
+ .handle(PartitionProducerStateResponseHandle::new)
+ .thenAcceptAsync(responseConsumer, executor));
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 956bad9..944cc07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -21,23 +21,19 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
-import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.function.SupplierWithException;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
/**
* Utility class to encapsulate the logic of building a {@link SingleInputGate} instance.
*/
public class SingleInputGateBuilder {
- private static final CompletableFuture<ResponseHandle> NO_OP_PRODUCER_CHECKER_RESULT = new CompletableFuture<>();
-
- public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id) -> NO_OP_PRODUCER_CHECKER_RESULT;
+ public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id, consumer) -> {};
private final IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ee78963..5879f52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -656,7 +656,7 @@ public class TaskTest extends TestLogger {
final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
- task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult ->
+ task.requestPartitionProducerState(resultId, partitionId, checkResult ->
assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false))
);
@@ -680,7 +680,7 @@ public class TaskTest extends TestLogger {
final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
- task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult ->
+ task.requestPartitionProducerState(resultId, partitionId, checkResult ->
assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false))
);
@@ -711,7 +711,7 @@ public class TaskTest extends TestLogger {
CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
- task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> {
+ task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
callCount.incrementAndGet();
}
@@ -749,7 +749,7 @@ public class TaskTest extends TestLogger {
CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
- task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> {
+ task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
callCount.incrementAndGet();
}