You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/12 16:42:57 UTC
[1/2] flink git commit: [FLINK-5114] [network] Handle partition
producer state check for unregistered executions
Repository: flink
Updated Branches:
refs/heads/master 47db9cb1a -> a078666d4
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 8177bf7..7953ceb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
@@ -37,7 +39,7 @@ import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -46,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
@@ -68,6 +71,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -258,14 +262,14 @@ public class TaskTest extends TestLogger {
// mock a network manager that rejects registration
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
- PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+ PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
NetworkEnvironment network = mock(NetworkEnvironment.class);
when(network.getResultPartitionManager()).thenReturn(partitionManager);
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
- Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionStateChecker, executor);
+ Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
task.registerExecutionListener(listener);
@@ -544,7 +548,7 @@ public class TaskTest extends TestLogger {
// Set the mock input gate
setInputGate(task, inputGate);
- // Expected task state for each partition state
+ // Expected task state for each producer state
final Map<ExecutionState, ExecutionState> expected = new HashMap<>(ExecutionState.values().length);
// Fail the task for unexpected states
@@ -556,7 +560,7 @@ public class TaskTest extends TestLogger {
expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
-
+
expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
@@ -564,7 +568,7 @@ public class TaskTest extends TestLogger {
for (ExecutionState state : ExecutionState.values()) {
setState(task, ExecutionState.RUNNING);
- task.onPartitionStateUpdate(resultId, partitionId.getPartitionId(), state);
+ task.onPartitionStateUpdate(resultId, partitionId, state);
ExecutionState newTaskState = task.getExecutionState();
@@ -575,6 +579,126 @@ public class TaskTest extends TestLogger {
}
/**
+ * Tests the trigger partition state update future completions.
+ */
+ @Test
+ public void testTriggerPartitionStateUpdate() throws Exception {
+ IntermediateDataSetID resultId = new IntermediateDataSetID();
+ ResultPartitionID partitionId = new ResultPartitionID();
+
+ LibraryCacheManager libCache = mock(LibraryCacheManager.class);
+ when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+
+ PartitionProducerStateChecker partitionChecker = mock(PartitionProducerStateChecker.class);
+
+ ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+ NetworkEnvironment network = mock(NetworkEnvironment.class);
+ when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
+ when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+ when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
+ .thenReturn(mock(TaskKvStateRegistry.class));
+
+ createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+
+ // Test all branches of trigger partition state check
+
+ {
+ // Reset latches
+ createQueuesAndActors();
+
+ // PartitionProducerDisposedException
+ Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+
+ FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+ when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
+
+ task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+ promise.completeExceptionally(new PartitionProducerDisposedException(partitionId));
+ assertEquals(ExecutionState.CANCELING, task.getExecutionState());
+ }
+
+ {
+ // Reset latches
+ createQueuesAndActors();
+
+ // Any other exception
+ Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+
+ FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+ when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
+
+ task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+ promise.completeExceptionally(new RuntimeException("Any other exception"));
+
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ }
+
+ {
+ // Reset latches
+ createQueuesAndActors();
+
+ // TimeoutException handled special => retry
+ Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+ SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getConsumedResultId()).thenReturn(resultId);
+
+ try {
+ task.startTaskThread();
+ awaitLatch.await();
+
+ setInputGate(task, inputGate);
+
+ FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+ when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
+
+ task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+ promise.completeExceptionally(new TimeoutException());
+
+ assertEquals(ExecutionState.RUNNING, task.getExecutionState());
+
+ verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+ } finally {
+ task.getExecutingThread().interrupt();
+ task.getExecutingThread().join();
+ }
+ }
+
+ {
+ // Reset latches
+ createQueuesAndActors();
+
+ // Success
+ Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+ SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getConsumedResultId()).thenReturn(resultId);
+
+ try {
+ task.startTaskThread();
+ awaitLatch.await();
+
+ setInputGate(task, inputGate);
+
+ FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+ when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
+
+ task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+ promise.complete(ExecutionState.RUNNING);
+
+ assertEquals(ExecutionState.RUNNING, task.getExecutionState());
+
+ verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+ } finally {
+ task.getExecutingThread().interrupt();
+ task.getExecutingThread().join();
+ }
+ }
+ }
+
+ /**
* Tests that interrupt happens via watch dog if canceller is stuck in cancel.
* Task cancellation blocks the task canceller. Interrupt after cancel via
* cancellation watch dog.
@@ -753,7 +877,7 @@ public class TaskTest extends TestLogger {
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
- PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+ PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
NetworkEnvironment network = mock(NetworkEnvironment.class);
when(network.getResultPartitionManager()).thenReturn(partitionManager);
@@ -761,7 +885,7 @@ public class TaskTest extends TestLogger {
when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
- return createTask(invokable, libCache, network, consumableNotifier, partitionStateChecker, executor, config, execConfig);
+ return createTask(invokable, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
}
private Task createTask(
@@ -769,9 +893,9 @@ public class TaskTest extends TestLogger {
LibraryCacheManager libCache,
NetworkEnvironment networkEnvironment,
ResultPartitionConsumableNotifier consumableNotifier,
- PartitionStateChecker partitionStateChecker,
+ PartitionProducerStateChecker partitionProducerStateChecker,
Executor executor) throws IOException {
- return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionStateChecker, executor, new Configuration(), new ExecutionConfig());
+ return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
}
private Task createTask(
@@ -779,7 +903,7 @@ public class TaskTest extends TestLogger {
LibraryCacheManager libCache,
NetworkEnvironment networkEnvironment,
ResultPartitionConsumableNotifier consumableNotifier,
- PartitionStateChecker partitionStateChecker,
+ PartitionProducerStateChecker partitionProducerStateChecker,
Executor executor,
Configuration taskManagerConfig,
ExecutionConfig execConfig) throws IOException {
@@ -837,7 +961,7 @@ public class TaskTest extends TestLogger {
new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
mock(TaskMetricGroup.class),
consumableNotifier,
- partitionStateChecker,
+ partitionProducerStateChecker,
executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 6e96400..291fd5f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -157,9 +157,8 @@ public class BlockingCheckpointsTest {
"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
new UnregisteredTaskMetricsGroup(),
mock(ResultPartitionConsumableNotifier.class),
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
Executors.directExecutor());
-
}
// ------------------------------------------------------------------------
@@ -297,4 +296,4 @@ public class BlockingCheckpointsTest {
@Override
protected void cancelTask() {}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 1f79384..6cde30f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -180,7 +180,7 @@ public class InterruptSensitiveRestoreTest {
"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
new UnregisteredTaskMetricsGroup(),
mock(ResultPartitionConsumableNotifier.class),
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
mock(Executor.class));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3fe8a37..d04c456 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -256,7 +256,7 @@ public class StreamTaskTest {
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
- PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+ PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
NetworkEnvironment network = mock(NetworkEnvironment.class);
@@ -303,7 +303,7 @@ public class StreamTaskTest {
new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
new UnregisteredTaskMetricsGroup(),
consumableNotifier,
- partitionStateChecker,
+ partitionProducerStateChecker,
executor);
}
[2/2] flink git commit: [FLINK-5114] [network] Handle partition
producer state check for unregistered executions
Posted by uc...@apache.org.
[FLINK-5114] [network] Handle partition producer state check for unregistered executions
This closes #2912.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a078666d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a078666d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a078666d
Branch: refs/heads/master
Commit: a078666d42ab4dae01dedaa50d55343ce141fcb8
Parents: 47db9cb
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 22 16:15:04 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Dec 12 17:39:16 2016 +0100
----------------------------------------------------------------------
.../executiongraph/IntermediateResult.java | 37 ++-
.../runtime/io/network/PartitionState.java | 64 -----
.../netty/PartitionProducerStateChecker.java | 52 +++++
.../io/network/netty/PartitionStateChecker.java | 35 ---
.../partition/consumer/SingleInputGate.java | 13 +-
.../PartitionProducerDisposedException.java | 36 +++
...torGatewayPartitionProducerStateChecker.java | 66 ++++++
.../ActorGatewayPartitionStateChecker.java | 67 ------
.../apache/flink/runtime/taskmanager/Task.java | 121 ++++++----
.../flink/runtime/taskmanager/TaskActions.java | 20 +-
.../flink/runtime/jobmanager/JobManager.scala | 70 ++++--
.../runtime/messages/JobManagerMessages.scala | 24 +-
.../flink/runtime/taskmanager/TaskManager.scala | 6 +-
.../partition/InputGateConcurrentTest.java | 4 -
.../partition/InputGateFairnessTest.java | 8 +-
.../consumer/LocalInputChannelTest.java | 3 -
.../partition/consumer/SingleInputGateTest.java | 6 +-
.../partition/consumer/TestSingleInputGate.java | 2 -
.../partition/consumer/UnionInputGateTest.java | 5 +-
.../runtime/jobmanager/JobManagerTest.java | 234 ++++++++++++++++---
.../runtime/taskmanager/TaskAsyncCallTest.java | 6 +-
.../runtime/taskmanager/TaskManagerTest.java | 14 +-
.../flink/runtime/taskmanager/TaskStopTest.java | 4 +-
.../flink/runtime/taskmanager/TaskTest.java | 148 +++++++++++-
.../runtime/tasks/BlockingCheckpointsTest.java | 7 +-
.../tasks/InterruptSensitiveRestoreTest.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 6 +-
27 files changed, 698 insertions(+), 364 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index c2c19d1..313272c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -34,6 +36,14 @@ public class IntermediateResult {
private final IntermediateResultPartition[] partitions;
+ /**
+ * Maps intermediate result partition IDs to a partition index. This is
+ * used for ID lookups of intermediate results. I didn't dare to change the
+ * partition connect logic in other places that is tightly coupled to the
+ * partitions being held as an array.
+ */
+ private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();
+
private final int numParallelProducers;
private final AtomicInteger numberOfRunningProducers;
@@ -54,10 +64,12 @@ public class IntermediateResult {
this.id = checkNotNull(id);
this.producer = checkNotNull(producer);
- this.partitions = new IntermediateResultPartition[numParallelProducers];
+
checkArgument(numParallelProducers >= 1);
this.numParallelProducers = numParallelProducers;
+ this.partitions = new IntermediateResultPartition[numParallelProducers];
+
this.numberOfRunningProducers = new AtomicInteger(numParallelProducers);
// we do not set the intermediate result partitions here, because we let them be initialized by
@@ -80,6 +92,7 @@ public class IntermediateResult {
}
partitions[partitionNumber] = partition;
+ partitionLookupHelper.put(partition.getPartitionId(), partitionNumber);
partitionsAssigned++;
}
@@ -95,6 +108,28 @@ public class IntermediateResult {
return partitions;
}
+ /**
+ * Returns the partition with the given ID.
+ *
+ * @param resultPartitionId ID of the partition to look up
+ * @throws NullPointerException If partition ID <code>null</code>
+ * @throws IllegalArgumentException Thrown if unknown partition ID
+ * @return Intermediate result partition with the given ID
+ */
+ public IntermediateResultPartition getPartitionById(IntermediateResultPartitionID resultPartitionId) {
+ // Looks ups the partition number via the helper map and returns the
+ // partition. Currently, this happens infrequently enough that we could
+ // consider removing the map and scanning the partitions on every lookup.
+ // The lookup (currently) only happen when the producer of an intermediate
+ // result cannot be found via its registered execution.
+ Integer partitionNumber = partitionLookupHelper.get(checkNotNull(resultPartitionId, "IntermediateResultPartitionID"));
+ if (partitionNumber != null) {
+ return partitions[partitionNumber];
+ } else {
+ throw new IllegalArgumentException("Unknown intermediate result partition ID " + resultPartitionId);
+ }
+ }
+
public int getNumberOfAssignedPartitions() {
return partitionsAssigned;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
deleted file mode 100644
index 59357fc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import java.io.Serializable;
-
-/**
- * Contains information about the state of a result partition.
- */
-public class PartitionState implements Serializable {
-
- private static final long serialVersionUID = -4693651272083825031L;
-
- private final IntermediateDataSetID intermediateDataSetID;
- private final IntermediateResultPartitionID intermediateResultPartitionID;
- private final ExecutionState executionState;
-
- public PartitionState(
- IntermediateDataSetID intermediateDataSetID,
- IntermediateResultPartitionID intermediateResultPartitionID,
- @Nullable ExecutionState executionState) {
-
- this.intermediateDataSetID = Preconditions.checkNotNull(intermediateDataSetID);
- this.intermediateResultPartitionID = Preconditions.checkNotNull(intermediateResultPartitionID);
- this.executionState = executionState;
- }
-
- public IntermediateDataSetID getIntermediateDataSetID() {
- return intermediateDataSetID;
- }
-
- public IntermediateResultPartitionID getIntermediateResultPartitionID() {
- return intermediateResultPartitionID;
- }
-
- /**
- * Returns the execution state of the partition producer or <code>null</code> if it is not available.
- */
- public ExecutionState getExecutionState() {
- return executionState;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
new file mode 100644
index 0000000..d0b7e1e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+/**
+ * Intermediate partition state checker to query the JobManager about the state
+ * of the producer of a result partition.
+ *
+ * <p>These checks are triggered when a partition request is answered with a
+ * PartitionNotFound event. This usually happens when the producer of that
+ * partition has not registered itself with the network stack or terminated.
+ */
+public interface PartitionProducerStateChecker {
+
+ /**
+ * Requests the execution state of the execution producing a result partition.
+ *
+ * @param jobId ID of the job the partition belongs to.
+ * @param intermediateDataSetId ID of the parent intermediate data set.
+ * @param resultPartitionId ID of the result partition to check. This
+ * identifies the producing execution and partition.
+ *
+ * @return Future holding the execution state of the producing execution.
+ */
+ Future<ExecutionState> requestPartitionProducerState(
+ JobID jobId,
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
deleted file mode 100644
index 949f426..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.PartitionState;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
-public interface PartitionStateChecker {
- Future<PartitionState> requestPartitionState(
- JobID jobId,
- ExecutionAttemptID executionId,
- IntermediateDataSetID resultId,
- ResultPartitionID partitionId);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b4d8d2c..d546559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -108,9 +108,6 @@ public class SingleInputGate implements InputGate {
/** The job ID of the owning task. */
private final JobID jobId;
- /** The execution attempt ID of the owning task. */
- private final ExecutionAttemptID executionId;
-
/**
* The ID of the consumed intermediate result. Each input gate consumes partitions of the
* intermediate result specified by this ID. This ID also identifies the input gate at the
@@ -168,7 +165,6 @@ public class SingleInputGate implements InputGate {
public SingleInputGate(
String owningTaskName,
JobID jobId,
- ExecutionAttemptID executionId,
IntermediateDataSetID consumedResultId,
int consumedSubpartitionIndex,
int numberOfInputChannels,
@@ -177,7 +173,6 @@ public class SingleInputGate implements InputGate {
this.owningTaskName = checkNotNull(owningTaskName);
this.jobId = checkNotNull(jobId);
- this.executionId = checkNotNull(executionId);
this.consumedResultId = checkNotNull(consumedResultId);
@@ -530,11 +525,7 @@ public class SingleInputGate implements InputGate {
}
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
- taskActions.triggerPartitionStateCheck(
- jobId,
- executionId,
- consumedResultId,
- partitionId);
+ taskActions.triggerPartitionProducerStateCheck(jobId, consumedResultId, partitionId);
}
private void queueChannel(InputChannel channel) {
@@ -587,7 +578,7 @@ public class SingleInputGate implements InputGate {
final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
final SingleInputGate inputGate = new SingleInputGate(
- owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
+ owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
icdd.length, taskActions, metrics);
// Create the input channels. There is one input channel for each consumed partition.
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
new file mode 100644
index 0000000..12f2433
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
+
+/**
+ * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * if the producer of a partition has been disposed.
+ */
+public class PartitionProducerDisposedException extends Exception {
+
+ public PartitionProducerDisposedException(ResultPartitionID resultPartitionID) {
+ super(String.format("Execution %s producing partition %s has already been disposed.",
+ resultPartitionID.getProducerId(),
+ resultPartitionID.getPartitionId()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
new file mode 100644
index 0000000..5c229a9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This implementation uses {@link ActorGateway} to trigger the partition state check at the job
+ * manager.
+ */
+public class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+ private final ActorGateway jobManager;
+ private final FiniteDuration timeout;
+
+ public ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, FiniteDuration timeout) {
+ this.jobManager = Preconditions.checkNotNull(jobManager);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
+ @Override
+ public Future<ExecutionState> requestPartitionProducerState(
+ JobID jobId,
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId) {
+
+ JobManagerMessages.RequestPartitionProducerState msg = new JobManagerMessages.RequestPartitionProducerState(
+ jobId,
+ intermediateDataSetId, resultPartitionId
+ );
+
+ scala.concurrent.Future<ExecutionState> futureResponse = jobManager
+ .ask(msg, timeout)
+ .mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
+
+ return new FlinkFuture<>(futureResponse);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
deleted file mode 100644
index efa6ec3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.PartitionState;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * This implementation uses {@link ActorGateway} to trigger the partition state check at the job
- * manager.
- */
-public class ActorGatewayPartitionStateChecker implements PartitionStateChecker {
-
- private final ActorGateway jobManager;
- private final FiniteDuration timeout;
-
- public ActorGatewayPartitionStateChecker(ActorGateway jobManager, FiniteDuration timeout) {
- this.jobManager = Preconditions.checkNotNull(jobManager);
- this.timeout = Preconditions.checkNotNull(timeout);
- }
-
- @Override
- public Future<PartitionState> requestPartitionState(
- JobID jobId,
- ExecutionAttemptID executionAttemptId,
- IntermediateDataSetID resultId,
- ResultPartitionID partitionId) {
- JobManagerMessages.RequestPartitionState msg = new JobManagerMessages.RequestPartitionState(
- jobId,
- partitionId,
- executionAttemptId,
- resultId);
-
- scala.concurrent.Future<PartitionState> futureResponse = jobManager
- .ask(msg, timeout)
- .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
-
- return new FlinkFuture<>(futureResponse);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 184c3b1..a1fb35e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -46,20 +46,19 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -206,8 +205,8 @@ public class Task implements Runnable, TaskActions {
/** Parent group for all metrics of this task */
private final TaskMetricGroup metrics;
- /** Partition state checker to request partition states from */
- private final PartitionStateChecker partitionStateChecker;
+ /** Partition producer state checker to request partition states from */
+ private final PartitionProducerStateChecker partitionProducerStateChecker;
/** Executor to run future callbacks */
private final Executor executor;
@@ -271,7 +270,7 @@ public class Task implements Runnable, TaskActions {
TaskManagerRuntimeInfo taskManagerConfig,
TaskMetricGroup metricGroup,
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
- PartitionStateChecker partitionStateChecker,
+ PartitionProducerStateChecker partitionProducerStateChecker,
Executor executor) {
Preconditions.checkNotNull(jobInformation);
@@ -321,7 +320,7 @@ public class Task implements Runnable, TaskActions {
this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
this.metrics = metricGroup;
- this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
+ this.partitionProducerStateChecker = Preconditions.checkNotNull(partitionProducerStateChecker);
this.executor = Preconditions.checkNotNull(executor);
// create the reader and writer structures
@@ -1036,32 +1035,37 @@ public class Task implements Runnable, TaskActions {
// ------------------------------------------------------------------------
@Override
- public void triggerPartitionStateCheck(
+ public void triggerPartitionProducerStateCheck(
JobID jobId,
- ExecutionAttemptID executionId,
- final IntermediateDataSetID resultId,
- final ResultPartitionID partitionId) {
- org.apache.flink.runtime.concurrent.Future<PartitionState> futurePartitionState = partitionStateChecker.requestPartitionState(
- jobId,
- executionId,
- resultId,
- partitionId);
-
- futurePartitionState.handleAsync(new BiFunction<PartitionState, Throwable, Void>() {
+ final IntermediateDataSetID intermediateDataSetId,
+ final ResultPartitionID resultPartitionId) {
+
+ org.apache.flink.runtime.concurrent.Future<ExecutionState> futurePartitionState =
+ partitionProducerStateChecker.requestPartitionProducerState(
+ jobId,
+ intermediateDataSetId,
+ resultPartitionId);
+
+ futurePartitionState.handleAsync(new BiFunction<ExecutionState, Throwable, Void>() {
@Override
- public Void apply(PartitionState partitionState, Throwable throwable) {
+ public Void apply(ExecutionState executionState, Throwable throwable) {
try {
- if (partitionState != null) {
+ if (executionState != null) {
onPartitionStateUpdate(
- partitionState.getIntermediateDataSetID(),
- partitionState.getIntermediateResultPartitionID(),
- partitionState.getExecutionState());
+ intermediateDataSetId,
+ resultPartitionId,
+ executionState);
} else if (throwable instanceof TimeoutException) {
// our request timed out, assume we're still running and try again
onPartitionStateUpdate(
- resultId,
- partitionId.getPartitionId(),
+ intermediateDataSetId,
+ resultPartitionId,
ExecutionState.RUNNING);
+ } else if (throwable instanceof PartitionProducerDisposedException) {
+ String msg = String.format("Producer {} of partition {} disposed. Cancelling execution.",
+ resultPartitionId.getProducerId(), resultPartitionId.getPartitionId());
+ LOG.info(msg, throwable);
+ cancelExecution();
} else {
failExternally(throwable);
}
@@ -1183,41 +1187,58 @@ public class Task implements Runnable, TaskActions {
/**
* Answer to a partition state check issued after a failed partition request.
*/
- public void onPartitionStateUpdate(
- IntermediateDataSetID resultId,
- IntermediateResultPartitionID partitionId,
- ExecutionState partitionState) throws IOException, InterruptedException {
+ @VisibleForTesting
+ void onPartitionStateUpdate(
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId,
+ ExecutionState producerState) throws IOException, InterruptedException {
if (executionState == ExecutionState.RUNNING) {
- final SingleInputGate inputGate = inputGatesById.get(resultId);
+ final SingleInputGate inputGate = inputGatesById.get(intermediateDataSetId);
if (inputGate != null) {
- if (partitionState == ExecutionState.RUNNING ||
- partitionState == ExecutionState.FINISHED ||
- partitionState == ExecutionState.SCHEDULED ||
- partitionState == ExecutionState.DEPLOYING) {
+ if (producerState == ExecutionState.SCHEDULED
+ || producerState == ExecutionState.DEPLOYING
+ || producerState == ExecutionState.RUNNING
+ || producerState == ExecutionState.FINISHED) {
// Retrigger the partition request
- inputGate.retriggerPartitionRequest(partitionId);
- }
- else if (partitionState == ExecutionState.CANCELED
- || partitionState == ExecutionState.CANCELING
- || partitionState == ExecutionState.FAILED) {
+ inputGate.retriggerPartitionRequest(resultPartitionId.getPartitionId());
+
+ } else if (producerState == ExecutionState.CANCELING
+ || producerState == ExecutionState.CANCELED
+ || producerState == ExecutionState.FAILED) {
+
+ // The producing execution has been canceled or failed. We
+ // don't need to re-trigger the request since it cannot
+ // succeed.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.",
+ taskNameWithSubtask,
+ resultPartitionId.getPartitionId(),
+ resultPartitionId.getProducerId(),
+ producerState);
+ }
cancelExecution();
+ } else {
+ // Any other execution state is unexpected. Currently, only
+ // state CREATED is left out of the checked states. If we
+ // see a producer in this state, something went wrong with
+ // scheduling in topological order.
+ String msg = String.format("Producer with attempt ID %s of partition %s in unexpected state %s.",
+ resultPartitionId.getProducerId(),
+ resultPartitionId.getPartitionId(),
+ producerState);
+
+ failExternally(new IllegalStateException(msg));
}
- else {
- failExternally(new IllegalStateException("Received unexpected partition state "
- + partitionState + " for partition request. This is a bug."));
- }
- }
- else {
- failExternally(new IllegalStateException("Received partition state for " +
- "unknown input gate " + resultId + ". This is a bug."));
+ } else {
+ failExternally(new IllegalStateException("Received partition producer state for " +
+ "unknown input gate " + intermediateDataSetId + "."));
}
- }
- else {
- LOG.debug("Ignoring partition state notification for not running task.");
+ } else {
+ LOG.debug("Task {} ignored a partition producer state notification, because it's not running.", taskNameWithSubtask);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
index 4f12691..f7650d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -29,21 +28,20 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
public interface TaskActions {
/**
- * Check the partition state of the given partition.
+ * Check the execution state of the execution producing a result partition.
*
- * @param jobId of the partition
- * @param executionId of the partition
- * @param resultId of the partition
- * @param partitionId of the partition
+ * @param jobId ID of the job the partition belongs to.
+ * @param intermediateDataSetId ID of the parent intermediate data set.
+ * @param resultPartitionId ID of the result partition to check. This
+ * identifies the producing execution and partition.
*/
- void triggerPartitionStateCheck(
+ void triggerPartitionProducerStateCheck(
JobID jobId,
- ExecutionAttemptID executionId,
- IntermediateDataSetID resultId,
- ResultPartitionID partitionId);
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId);
/**
- * Fail the owning task with the given throwawble.
+ * Fail the owning task with the given throwable.
*
* @param cause of the failure
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1dfd3db..8c686cd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -50,8 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.executiongraph._
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
-import org.apache.flink.runtime.io.network.PartitionState
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
@@ -78,7 +77,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.runtime.{FlinkActor, JobException, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
import org.jboss.netty.channel.ChannelException
@@ -935,27 +934,58 @@ class JobManager(
)
}
- case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) =>
- val state = currentJobs.get(jobId) match {
+ case RequestPartitionProducerState(jobId, intermediateDataSetId, resultPartitionId) =>
+ currentJobs.get(jobId) match {
case Some((executionGraph, _)) =>
- val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId)
+ try {
+ // Find the execution attempt producing the intermediate result partition.
+ val execution = executionGraph
+ .getRegisteredExecutions
+ .get(resultPartitionId.getProducerId)
+
+ if (execution != null) {
+ // Common case for pipelined exchanges => producing execution is
+ // still active.
+ sender ! decorateMessage(execution.getState)
+ } else {
+ // The producing execution might have terminated and been
+ // unregistered. We now look for the producing execution via the
+ // intermediate result itself.
+ val intermediateResult = executionGraph
+ .getAllIntermediateResults.get(intermediateDataSetId)
+
+ if (intermediateResult != null) {
+ // Try to find the producing execution
+ val producerExecution = intermediateResult
+ .getPartitionById(resultPartitionId.getPartitionId)
+ .getProducer
+ .getCurrentExecutionAttempt
+
+ if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
+ sender ! decorateMessage(producerExecution.getState)
+ } else {
+ val cause = new PartitionProducerDisposedException(resultPartitionId)
+ sender ! decorateMessage(Status.Failure(cause))
+ }
+ } else {
+ val cause = new IllegalArgumentException(
+ s"Intermediate data set with ID $intermediateDataSetId not found.")
+ sender ! decorateMessage(Status.Failure(cause))
+ }
+ }
+ } catch {
+ case e: Exception =>
+ sender ! decorateMessage(
+ Status.Failure(new RuntimeException("Failed to look up execution state of " +
+ s"producer with ID ${resultPartitionId.getProducerId}.", e)))
+ }
- if (execution != null) execution.getState else null
case None =>
- // Nothing to do. This is not an error, because the request is received when a sending
- // task fails or is not yet available during a remote partition request.
- log.debug(s"Cannot find execution graph for job $jobId.")
+ sender ! decorateMessage(
+ Status.Failure(new IllegalArgumentException(s"Job with ID $jobId not found.")))
- null
}
- sender ! decorateMessage(
- new PartitionState(
- taskResultId,
- partitionId.getPartitionId,
- state)
- )
-
case RequestJobStatus(jobID) =>
currentJobs.get(jobID) match {
case Some((executionGraph,_)) =>
@@ -1059,7 +1089,7 @@ class JobManager(
taskManagerMap.get(taskManagerActorRef) match {
case Some(instanceId) => handleTaskManagerTerminated(taskManagerActorRef, instanceId)
case None => log.debug("Received terminated message for task manager " +
- s"${taskManagerActorRef} which is not " +
+ s"$taskManagerActorRef which is not " +
"connected to this job manager.")
}
@@ -2092,7 +2122,7 @@ object JobManager {
def sleepBeforeRetry() : Unit = {
if (maxSleepBetweenRetries > 0) {
val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
- LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.")
+ LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
Thread.sleep(sleepTime)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 3d72f1a..65819f4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -157,20 +157,18 @@ object JobManagerMessages {
case class NextInputSplit(splitData: Array[Byte])
/**
- * Requests the current state of the partition.
- *
- * The state of a partition is currently bound to the state of the producing execution.
- *
- * @param jobId The job ID of the job, which produces the partition.
- * @param partitionId The partition ID of the partition to request the state of.
- * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
- * @param taskResultId The input gate ID of the task requesting the partition state.
- */
- case class RequestPartitionState(
+ * Requests the execution state of the execution producing a result partition.
+ *
+ * @param jobId ID of the job the partition belongs to.
+ * @param intermediateDataSetId ID of the parent intermediate data set.
+ * @param resultPartitionId ID of the result partition to check. This
+ * identifies the producing execution and
+ * partition.
+ */
+ case class RequestPartitionProducerState(
jobId: JobID,
- partitionId: ResultPartitionID,
- taskExecutionId: ExecutionAttemptID,
- taskResultId: IntermediateDataSetID)
+ intermediateDataSetId: IntermediateDataSetID,
+ resultPartitionId: ResultPartitionID)
extends RequiresLeaderSessionID
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 271578f..41d3077 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher}
-import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionStateChecker}
+import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionProducerStateChecker}
import org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier, ResultPartitionManager}
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
import org.apache.flink.runtime.memory.MemoryManager
@@ -181,7 +181,7 @@ class TaskManager(
private var connectionUtils: Option[(
CheckpointResponder,
- PartitionStateChecker,
+ PartitionProducerStateChecker,
ResultPartitionConsumableNotifier,
TaskManagerConnection)] = None
@@ -916,7 +916,7 @@ class TaskManager(
val taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway)
- val partitionStateChecker = new ActorGatewayPartitionStateChecker(
+ val partitionStateChecker = new ActorGatewayPartitionProducerStateChecker(
jobManagerGateway,
config.timeout)
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 6570679..8cae04c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -60,7 +59,6 @@ public class InputGateConcurrentTest {
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
@@ -96,7 +94,6 @@ public class InputGateConcurrentTest {
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0,
numChannels,
@@ -146,7 +143,6 @@ public class InputGateConcurrentTest {
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0,
numChannels,
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index b35612a..7e1d792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -87,7 +86,6 @@ public class InputGateFairnessTest {
SingleInputGate gate = new FairnessVerifyingInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
@@ -141,7 +139,6 @@ public class InputGateFairnessTest {
SingleInputGate gate = new FairnessVerifyingInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
@@ -192,7 +189,6 @@ public class InputGateFairnessTest {
SingleInputGate gate = new FairnessVerifyingInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
@@ -248,7 +244,6 @@ public class InputGateFairnessTest {
SingleInputGate gate = new FairnessVerifyingInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
@@ -346,14 +341,13 @@ public class InputGateFairnessTest {
public FairnessVerifyingInputGate(
String owningTaskName,
JobID jobId,
- ExecutionAttemptID executionId,
IntermediateDataSetID consumedResultId,
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
TaskIOMetricGroup metrics) {
- super(owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
+ super(owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
numberOfInputChannels, taskActions, metrics);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 35ed4c3..37ec751 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -283,7 +282,6 @@ public class LocalInputChannelTest {
final SingleInputGate gate = new SingleInputGate(
"test task name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0,
1,
@@ -481,7 +479,6 @@ public class LocalInputChannelTest {
this.inputGate = new SingleInputGate(
"Test Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
subpartitionIndex,
numberOfInputChannels,
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 7cae362..a25b8d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -73,7 +73,7 @@ public class SingleInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final SingleInputGate inputGate = new SingleInputGate(
- "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ "Test Task Name", new JobID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final TestInputChannel[] inputChannels = new TestInputChannel[]{
new TestInputChannel(inputGate, 0),
@@ -127,7 +127,7 @@ public class SingleInputGateTest {
// Setup reader with one local and one unknown input channel
final IntermediateDataSetID resultId = new IntermediateDataSetID();
- final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
@@ -178,7 +178,6 @@ public class SingleInputGateTest {
SingleInputGate inputGate = new SingleInputGate(
"t1",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0,
1,
@@ -218,7 +217,6 @@ public class SingleInputGateTest {
final SingleInputGate inputGate = new SingleInputGate(
"InputGate",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0,
1,
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 126a96e..fe3b087 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -55,7 +54,6 @@ public class TestSingleInputGate {
SingleInputGate realGate = new SingleInputGate(
"Test Task Name",
new JobID(),
- new ExecutionAttemptID(),
new IntermediateDataSetID(),
0,
numberOfInputChannels,
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 84ec202..c05df0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -43,8 +42,8 @@ public class UnionInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final String testTaskName = "Test Task";
- final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
- final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index f941c24..6d8c70b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -56,7 +55,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
@@ -116,7 +115,6 @@ import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingClu
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -221,49 +219,227 @@ public class JobManagerTest {
// - The test ----------------------------------------------------------------------
// 1. All execution states
- RequestPartitionState request = new RequestPartitionState(
- jid, partitionId, receiver, rid);
+ RequestPartitionProducerState request = new RequestPartitionProducerState(
+ jid, rid, partitionId);
for (ExecutionState state : ExecutionState.values()) {
ExecutionGraphTestUtils.setVertexState(vertex, state);
- Future<PartitionState> futurePartitionState = jobManagerGateway
+ Future<ExecutionState> futurePartitionState = jobManagerGateway
.ask(request, getRemainingTime())
- .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+ .mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
- PartitionState resp = Await.result(futurePartitionState, getRemainingTime());
-
- assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
- assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
- assertEquals(state, resp.getExecutionState());
+ ExecutionState resp = Await.result(futurePartitionState, getRemainingTime());
+ assertEquals(state, resp);
}
// 2. Non-existing execution
- request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
+ request = new RequestPartitionProducerState(jid, rid, new ResultPartitionID());
+
+ Future<?> futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+ try {
+ Await.result(futurePartitionState, getRemainingTime());
+ fail("Did not fail with expected RuntimeException");
+ } catch (RuntimeException e) {
+ assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+ }
- Future<PartitionState> futurePartitionState = jobManagerGateway
- .ask(request, getRemainingTime())
- .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+ // 3. Non-existing job
+ request = new RequestPartitionProducerState(new JobID(), rid, new ResultPartitionID());
+ futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+
+ try {
+ Await.result(futurePartitionState, getRemainingTime());
+ fail("Did not fail with expected IllegalArgumentException");
+ } catch (IllegalArgumentException ignored) {
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+ };
+ }};
+ }
- PartitionState resp = Await.result(futurePartitionState, getRemainingTime());
+ /**
+ * Tests the JobManager response when the execution is not registered with
+ * the ExecutionGraph.
+ */
+ @Test
+ public void testRequestPartitionStateUnregisteredExecution() throws Exception {
+ new JavaTestKit(system) {{
+ new Within(duration("15 seconds")) {
+ @Override
+ protected void run() {
+ // Setup
+ TestingCluster cluster = null;
- assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
- assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
- assertNull(resp.getExecutionState());
+ try {
+ cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
- // 3. Non-existing job
- request = new RequestPartitionState(
- new JobID(), new ResultPartitionID(), receiver, rid);
+ final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+ // Create a task
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setParallelism(1);
+ sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+ sender.createAndAddResultDataSet(rid, PIPELINED);
+
+ final JobVertex sender2 = new JobVertex("Blocking Sender");
+ sender2.setParallelism(1);
+ sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+ final JobID jid = jobGraph.getJobID();
+
+ final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+ TestingUtils.TESTING_DURATION());
+
+ // we can set the leader session ID to None because we don't use this gateway to send messages
+ final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+ // Submit the job and wait for all vertices to be running
+ jobManagerGateway.tell(
+ new SubmitJob(
+ jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT),
+ testActorGateway);
+ expectMsgClass(JobSubmitSuccess.class);
+
+ jobManagerGateway.tell(
+ new WaitForAllVerticesToBeRunningOrFinished(jid),
+ testActorGateway);
+
+ expectMsgClass(AllVerticesRunning.class);
+
+ Future<Object> egFuture = jobManagerGateway.ask(
+ new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+ ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+ ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph();
+
+ ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+ while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+ Thread.sleep(1);
+ }
+
+ IntermediateResultPartition partition = vertex.getProducedPartitions()
+ .values().iterator().next();
+
+ ResultPartitionID partitionId = new ResultPartitionID(
+ partition.getPartitionId(),
+ vertex.getCurrentExecutionAttempt().getAttemptId());
- futurePartitionState = jobManagerGateway
+ // Producer finished, request state
+ Object request = new RequestPartitionProducerState(jid, rid, partitionId);
+
+ Future<ExecutionState> producerStateFuture = jobManagerGateway
.ask(request, getRemainingTime())
- .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+ .mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
+
+ assertEquals(ExecutionState.FINISHED, Await.result(producerStateFuture, getRemainingTime()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+ };
+ }};
+ }
+
+ /**
+ * Tests the JobManager response when the execution is not registered with
+ * the ExecutionGraph anymore and a new execution attempt is available.
+ */
+ @Test
+ public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception {
+ new JavaTestKit(system) {{
+ new Within(duration("15 seconds")) {
+ @Override
+ protected void run() {
+ // Setup
+ TestingCluster cluster = null;
+
+ try {
+ cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+ final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+ // Create a task
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setParallelism(1);
+ sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+ sender.createAndAddResultDataSet(rid, PIPELINED);
- resp = Await.result(futurePartitionState, getRemainingTime());
+ final JobVertex sender2 = new JobVertex("Blocking Sender");
+ sender2.setParallelism(1);
+ sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
- assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
- assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
- assertNull(resp.getExecutionState());
+ final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+ final JobID jid = jobGraph.getJobID();
+
+ final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+ TestingUtils.TESTING_DURATION());
+
+ // we can set the leader session ID to None because we don't use this gateway to send messages
+ final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+ // Submit the job and wait for all vertices to be running
+ jobManagerGateway.tell(
+ new SubmitJob(
+ jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT),
+ testActorGateway);
+ expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+ jobManagerGateway.tell(
+ new WaitForAllVerticesToBeRunningOrFinished(jid),
+ testActorGateway);
+
+ expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+ Future<Object> egFuture = jobManagerGateway.ask(
+ new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+ ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+ ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph();
+
+ ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+ while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+ Thread.sleep(1);
+ }
+
+ IntermediateResultPartition partition = vertex.getProducedPartitions()
+ .values().iterator().next();
+
+ ResultPartitionID partitionId = new ResultPartitionID(
+ partition.getPartitionId(),
+ vertex.getCurrentExecutionAttempt().getAttemptId());
+
+ // Reset execution => new execution attempt
+ vertex.resetForNewExecution();
+
+ // Producer finished, request state
+ Object request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
+
+ Future<?> producerStateFuture = jobManagerGateway.ask(request, getRemainingTime());
+
+ try {
+ Await.result(producerStateFuture, getRemainingTime());
+ fail("Did not fail with expected Exception");
+ } catch (PartitionProducerDisposedException ignored) {
+ }
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e37467b..a7ffa1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -145,7 +145,7 @@ public class TaskAsyncCallTest {
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
- PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+ PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
@@ -191,7 +191,7 @@ public class TaskAsyncCallTest {
new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
mock(TaskMetricGroup.class),
consumableNotifier,
- partitionStateChecker,
+ partitionProducerStateChecker,
executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index fd9ff05..5ccf8a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -102,7 +101,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -1579,15 +1578,8 @@ public class TaskManagerTest extends TestLogger {
@Override
public void handleMessage(Object message) throws Exception {
- if (message instanceof RequestPartitionState) {
- final RequestPartitionState msg = (RequestPartitionState) message;
-
- PartitionState resp = new PartitionState(
- msg.taskResultId(),
- msg.partitionId().getPartitionId(),
- ExecutionState.RUNNING);
-
- getSender().tell(decorateMessage(resp), getSelf());
+ if (message instanceof RequestPartitionProducerState) {
+ getSender().tell(decorateMessage(ExecutionState.RUNNING), getSelf());
}
else if (message instanceof TaskMessages.UpdateTaskExecutionState) {
final TaskExecutionState msg = ((TaskMessages.UpdateTaskExecutionState) message)
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index d80dab3..ae7d0b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -95,7 +95,7 @@ public class TaskStopTest {
tmRuntimeInfo,
mock(TaskMetricGroup.class),
mock(ResultPartitionConsumableNotifier.class),
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
mock(Executor.class));
Field f = task.getClass().getDeclaredField("invokable");
f.setAccessible(true);