You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/09 13:20:43 UTC
flink git commit: [FLINK-5114] [network] Handle partition producer
state check for unregistered executions
Repository: flink
Updated Branches:
refs/heads/release-1.1 3ae6e9e09 -> 2b612f2d8
[FLINK-5114] [network] Handle partition producer state check for unregistered executions
This closes #2975.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b612f2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b612f2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b612f2d
Branch: refs/heads/release-1.1
Commit: 2b612f2d8fa2493fa1d0d586bc0fe10afa9150ca
Parents: 3ae6e9e
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Dec 8 23:48:39 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Dec 9 14:20:08 2016 +0100
----------------------------------------------------------------------
.../executiongraph/IntermediateResult.java | 33 +++
.../runtime/io/network/NetworkEnvironment.java | 37 +--
.../netty/PartitionProducerStateChecker.java | 49 ++++
.../io/network/netty/PartitionStateChecker.java | 34 ---
.../partition/consumer/SingleInputGate.java | 10 +-
.../PartitionProducerDisposedException.java | 36 +++
.../flink/runtime/jobmanager/JobManager.scala | 74 ++++--
.../runtime/messages/JobManagerMessages.scala | 29 ++-
.../runtime/messages/TaskControlMessages.scala | 25 +-
.../flink/runtime/taskmanager/TaskManager.scala | 46 +++-
.../partition/InputGateConcurrentTest.java | 10 +-
.../partition/InputGateFairnessTest.java | 20 +-
.../consumer/LocalInputChannelTest.java | 6 +-
.../partition/consumer/SingleInputGateTest.java | 12 +-
.../partition/consumer/TestSingleInputGate.java | 4 +-
.../partition/consumer/UnionInputGateTest.java | 6 +-
.../runtime/jobmanager/JobManagerTest.java | 252 ++++++++++++++++---
.../runtime/taskmanager/TaskManagerTest.java | 22 +-
18 files changed, 520 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 67b1fe0..6da2cd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -34,6 +36,14 @@ public class IntermediateResult {
private final IntermediateResultPartition[] partitions;
+ /**
+ * Maps intermediate result partition IDs to a partition index. This is
+ * used for ID lookups of intermediate results. I didn't dare to change the
+ * partition connect logic in other places that is tightly coupled to the
+ * partitions being held as an array.
+ */
+ private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();
+
private final int numParallelProducers;
private final AtomicInteger numberOfRunningProducers;
@@ -80,6 +90,7 @@ public class IntermediateResult {
}
partitions[partitionNumber] = partition;
+ partitionLookupHelper.put(partition.getPartitionId(), partitionNumber);
partitionsAssigned++;
}
@@ -95,6 +106,28 @@ public class IntermediateResult {
return partitions;
}
+ /**
+ * Returns the partition with the given ID.
+ *
+ * @param resultPartitionId ID of the partition to look up
+ * @throws NullPointerException If partition ID <code>null</code>
+ * @throws IllegalArgumentException Thrown if unknown partition ID
+ * @return Intermediate result partition with the given ID
+ */
+ public IntermediateResultPartition getPartitionById(IntermediateResultPartitionID resultPartitionId) {
+ // Looks ups the partition number via the helper map and returns the
+ // partition. Currently, this happens infrequently enough that we could
+ // consider removing the map and scanning the partitions on every lookup.
+ // The lookup (currently) only happen when the producer of an intermediate
+ // result cannot be found via its registered execution.
+ Integer partitionNumber = partitionLookupHelper.get(checkNotNull(resultPartitionId, "IntermediateResultPartitionID"));
+ if (partitionNumber != null) {
+ return partitions[partitionNumber];
+ } else {
+ throw new IllegalArgumentException("Unknown intermediate result partition ID " + resultPartitionId);
+ }
+ }
+
public int getNumberOfAssignedPartitions() {
return partitionsAssigned;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d3715ed..7bac93b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -28,14 +28,14 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
import org.apache.flink.runtime.messages.TaskMessages.FailTask;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
@@ -82,7 +82,7 @@ public class NetworkEnvironment {
private ResultPartitionConsumableNotifier partitionConsumableNotifier;
- private PartitionStateChecker partitionStateChecker;
+ private PartitionProducerStateChecker partitionStateChecker;
private boolean isShutdown;
@@ -143,7 +143,7 @@ public class NetworkEnvironment {
return partitionConsumableNotifier;
}
- public PartitionStateChecker getPartitionStateChecker() {
+ public PartitionProducerStateChecker getPartitionProducerStateChecker() {
return partitionStateChecker;
}
@@ -196,7 +196,7 @@ public class NetworkEnvironment {
taskManagerGateway,
jobManagerTimeout);
- this.partitionStateChecker = new JobManagerPartitionStateChecker(
+ this.partitionStateChecker = new ActorGatewayPartitionProducerStateChecker(
jobManagerGateway, taskManagerGateway);
// ----- Network connections -----
@@ -474,28 +474,31 @@ public class NetworkEnvironment {
}
}
- private static class JobManagerPartitionStateChecker implements PartitionStateChecker {
+ private static class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker {
private final ActorGateway jobManager;
private final ActorGateway taskManager;
- public JobManagerPartitionStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
+ ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
this.jobManager = jobManager;
this.taskManager = taskManager;
}
@Override
- public void triggerPartitionStateCheck(
- JobID jobId,
- ExecutionAttemptID executionAttemptID,
- IntermediateDataSetID resultId,
- ResultPartitionID partitionId) {
-
- RequestPartitionState msg = new RequestPartitionState(
- jobId, partitionId, executionAttemptID, resultId);
-
- jobManager.tell(msg, taskManager);
+ public void requestPartitionProducerState(
+ JobID jobId,
+ ExecutionAttemptID receiverExecutionId,
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId) {
+
+ jobManager.tell(
+ new RequestPartitionProducerState(
+ jobId,
+ receiverExecutionId,
+ intermediateDataSetId,
+ resultPartitionId),
+ taskManager);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
new file mode 100644
index 0000000..18d92b1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+/**
+ * Intermediate partition state checker to query the JobManager about the state
+ * of the producer of a result partition.
+ *
+ * <p>These checks are triggered when a partition request is answered with a
+ * PartitionNotFound event. This usually happens when the producer of that
+ * partition has not registered itself with the network stack or terminated.
+ */
+public interface PartitionProducerStateChecker {
+
+ /**
+ * Requests the execution state of the execution producing a result partition.
+ *
+ * @param jobId ID of the job the partition belongs to.
+ * @param receiverExecutionId The execution attempt ID of the task who triggers the request.
+ * @param intermediateDataSetId ID of the parent intermediate data set.
+ * @param resultPartitionId ID of the result partition to check. This
+ */
+ void requestPartitionProducerState(
+ JobID jobId, ExecutionAttemptID receiverExecutionId,
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
deleted file mode 100644
index ecbcdaa..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
-public interface PartitionStateChecker {
-
- void triggerPartitionStateCheck(
- JobID jobId,
- ExecutionAttemptID executionId,
- IntermediateDataSetID resultId,
- ResultPartitionID partitionId);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 1550b0d..67bc8d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -139,7 +139,7 @@ public class SingleInputGate implements InputGate {
private final BitSet channelsWithEndOfPartitionEvents;
/** The partition state checker to use for failed partition requests. */
- private final PartitionStateChecker partitionStateChecker;
+ private final PartitionProducerStateChecker partitionStateChecker;
/**
* Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
@@ -172,7 +172,7 @@ public class SingleInputGate implements InputGate {
IntermediateDataSetID consumedResultId,
int consumedSubpartitionIndex,
int numberOfInputChannels,
- PartitionStateChecker partitionStateChecker,
+ PartitionProducerStateChecker partitionStateChecker,
IOMetricGroup metrics) {
this.owningTaskName = checkNotNull(owningTaskName);
@@ -510,7 +510,7 @@ public class SingleInputGate implements InputGate {
}
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
- partitionStateChecker.triggerPartitionStateCheck(
+ partitionStateChecker.requestPartitionProducerState(
jobId,
executionId,
consumedResultId,
@@ -567,7 +567,7 @@ public class SingleInputGate implements InputGate {
final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
- icdd.length, networkEnvironment.getPartitionStateChecker(), metrics);
+ icdd.length, networkEnvironment.getPartitionProducerStateChecker(), metrics);
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
new file mode 100644
index 0000000..12f2433
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
+
+/**
+ * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * if the producer of a partition has been disposed.
+ */
+public class PartitionProducerDisposedException extends Exception {
+
+ public PartitionProducerDisposedException(ResultPartitionID resultPartitionID) {
+ super(String.format("Execution %s producing partition %s has already been disposed.",
+ resultPartitionID.getProducerId(),
+ resultPartitionID.getPartitionId()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2b455b7..d6d23d9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
-import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.messages.TaskMessages.{PartitionProducerState, UpdateTaskExecutionState}
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
@@ -839,28 +839,68 @@ class JobManager(
)
}
- case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) =>
- val state = currentJobs.get(jobId) match {
+ case RequestPartitionProducerState(
+ jobId,
+ receiverId,
+ intermediateDataSetId,
+ resultPartitionId) =>
+
+ currentJobs.get(jobId) match {
case Some((executionGraph, _)) =>
- val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId)
+ try {
+ // Find the execution attempt producing the intermediate result partition.
+ val execution = executionGraph
+ .getRegisteredExecutions
+ .get(resultPartitionId.getProducerId)
+
+ if (execution != null) {
+ // Common case for pipelined exchanges => producing execution is
+ // still active.
+ val success = (intermediateDataSetId, resultPartitionId, execution.getState)
+ sender ! decorateMessage(PartitionProducerState(receiverId, Left(success)))
+ } else {
+ // The producing execution might have terminated and been
+ // unregistered. We now look for the producing execution via the
+ // intermediate result itself.
+ val intermediateResult = executionGraph
+ .getAllIntermediateResults.get(intermediateDataSetId)
+
+ if (intermediateResult != null) {
+ // Try to find the producing execution
+ val producerExecution = intermediateResult
+ .getPartitionById(resultPartitionId.getPartitionId)
+ .getProducer
+ .getCurrentExecutionAttempt
+
+ if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
+ val success = (
+ intermediateDataSetId,
+ resultPartitionId,
+ producerExecution.getState)
+ sender ! decorateMessage(PartitionProducerState(receiverId, Left(success)))
+ } else {
+ val failure = new PartitionProducerDisposedException(resultPartitionId)
+ sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
+ }
+ } else {
+ val failure = new IllegalArgumentException("Intermediate data set with ID" +
+ s"$intermediateDataSetId not found.")
+ sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
+ }
+ }
+ } catch {
+ case e: Exception =>
+ val failure = new RuntimeException("Failed to look up execution state of " +
+ s"producer with ID ${resultPartitionId.getProducerId}.", e)
+ sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
+ }
- if (execution != null) execution.getState else null
case None =>
- // Nothing to do. This is not an error, because the request is received when a sending
- // task fails or is not yet available during a remote partition request.
- log.debug(s"Cannot find execution graph for job $jobId.")
+ val failure = new IllegalArgumentException(s"Job with ID $jobId not found.")
+ sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
- null
}
- sender ! decorateMessage(
- PartitionState(
- taskExecutionId,
- taskResultId,
- partitionId.getPartitionId,
- state)
- )
-
case RequestJobStatus(jobID) =>
currentJobs.get(jobID) match {
case Some((executionGraph,_)) =>
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 14f72b0..97006d2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -127,21 +127,24 @@ object JobManagerMessages {
*/
case class NextInputSplit(splitData: Array[Byte])
+
/**
- * Requests the current state of the partition.
- *
- * The state of a partition is currently bound to the state of the producing execution.
- *
- * @param jobId The job ID of the job, which produces the partition.
- * @param partitionId The partition ID of the partition to request the state of.
- * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
- * @param taskResultId The input gate ID of the task requesting the partition state.
- */
- case class RequestPartitionState(
+ * Requests the execution state of the execution producing a result partition.
+ *
+ * @param jobId ID of the job the partition belongs to.
+ * @param receiverExecutionId Execution ID of the task who triggers the request. Required to
+ * return an answer to the TaskManager, which needs the ID to route
+ * this to the receiver task.
+ * @param intermediateDataSetId ID of the parent intermediate data set.
+ * @param resultPartitionId ID of the result partition to check. This
+ * identifies the producing execution and
+ * partition.
+ */
+ case class RequestPartitionProducerState(
jobId: JobID,
- partitionId: ResultPartitionID,
- taskExecutionId: ExecutionAttemptID,
- taskResultId: IntermediateDataSetID)
+ receiverExecutionId: ExecutionAttemptID,
+ intermediateDataSetId: IntermediateDataSetID,
+ resultPartitionId: ResultPartitionID)
extends RequiresLeaderSessionID
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
index 94762ee..e73d651 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.messages
import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState
import org.apache.flink.runtime.taskmanager.TaskExecutionState
/**
@@ -92,13 +93,19 @@ object TaskMessages {
// --------------------------------------------------------------------------
/**
- * Answer to a [[RequestPartitionState]] with the state of the respective partition.
- */
- case class PartitionState(
- taskExecutionId: ExecutionAttemptID,
- taskResultId: IntermediateDataSetID,
- partitionId: IntermediateResultPartitionID,
- state: ExecutionState)
+ * Answer to a [[RequestPartitionProducerState]] with the state of the partition producer.
+ *
+ * @param receiverExecutionId The execution attempt ID of the task who triggered the
+ * original request and should receive this update.
+ * @param result Either a successful or failed partition producer state check
+ * result. On success, this is a 3-tuple of intermediate data set ID
+ * (to identify the input gate), the partition ID (to identify the
+ * channel) and the producer state. On failure, this contains the
+ * failure cause.
+ */
+ case class PartitionProducerState(
+ receiverExecutionId: ExecutionAttemptID,
+ result: Either[(IntermediateDataSetID, ResultPartitionID, ExecutionState), Exception])
extends TaskMessage with RequiresLeaderSessionID
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 40ae234..a751865 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -24,7 +24,7 @@ import java.lang.reflect.Method
import java.net.{InetAddress, InetSocketAddress}
import java.util
import java.util.UUID
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{TimeUnit, TimeoutException}
import javax.management.ObjectName
import _root_.akka.actor._
@@ -42,11 +42,11 @@ import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
-import org.apache.flink.runtime.clusterframework.messages.StopCluster
-import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor}
import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
+import org.apache.flink.runtime.clusterframework.messages.StopCluster
+import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
@@ -58,16 +58,17 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.Messages._
import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, SampleTaskStackTrace, StackTraceSampleMessages, TriggerStackTraceSample}
+import org.apache.flink.runtime.messages.StackTraceSampleMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -80,7 +81,7 @@ import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Success}
/**
* The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -503,15 +504,38 @@ class TaskManager(
)
}
- case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
- Option(runningTasks.get(taskExecutionId)) match {
+ // Updates the partition producer state
+ case PartitionProducerState(receiverExecutionId, result) =>
+ Option(runningTasks.get(receiverExecutionId)) match {
case Some(task) =>
- task.onPartitionStateUpdate(taskResultId, partitionId, state)
+ try {
+ result match {
+ case Left((intermediateDataSetId, resultPartitionId, producerState)) =>
+ // Forward the state update to the task
+ task.onPartitionStateUpdate(
+ intermediateDataSetId,
+ resultPartitionId.getPartitionId,
+ producerState)
+
+ case Right(failure) =>
+ // Cancel or fail the execution
+ if (failure.isInstanceOf[PartitionProducerDisposedException]) {
+ log.info("Partition producer disposed. Cancelling " +
+ s"execution $receiverExecutionId.", failure)
+ task.cancelExecution()
+ } else {
+ task.failExternally(failure)
+ }
+ }
+ } catch {
+ case e: Exception => task.failExternally(e)
+ }
case None =>
- log.debug(s"Cannot find task $taskExecutionId to respond with partition state.")
+ log.debug(s"Cannot find task with ID $receiverExecutionId to forward partition " +
+ "state respond to.")
}
}
- }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index a5f4c7d..955f335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -65,7 +65,7 @@ public class InputGateConcurrentTest {
new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
@@ -102,7 +102,7 @@ public class InputGateConcurrentTest {
new IntermediateDataSetID(),
0,
numChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
@@ -152,7 +152,7 @@ public class InputGateConcurrentTest {
new IntermediateDataSetID(),
0,
numChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
for (int i = 0, local = 0; i < numChannels; i++) {
@@ -192,7 +192,7 @@ public class InputGateConcurrentTest {
// ------------------------------------------------------------------------
private static abstract class Source {
-
+
abstract void addBuffer(Buffer buffer) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 192b0eb..c367018 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -92,7 +92,7 @@ public class InputGateFairnessTest {
new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
@@ -146,7 +146,7 @@ public class InputGateFairnessTest {
new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
@@ -197,7 +197,7 @@ public class InputGateFairnessTest {
new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final ConnectionManager connManager = createDummyConnectionManager();
@@ -206,11 +206,11 @@ public class InputGateFairnessTest {
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
- gate, i, new ResultPartitionID(), mock(ConnectionID.class),
+ gate, i, new ResultPartitionID(), mock(ConnectionID.class),
connManager, new Tuple2<>(0, 0), new DummyIOMetricGroup());
channels[i] = channel;
-
+
for (int p = 0; p < buffersPerChannel; p++) {
channel.onBuffer(mockBuffer, p);
}
@@ -253,7 +253,7 @@ public class InputGateFairnessTest {
new ExecutionAttemptID(),
new IntermediateDataSetID(),
0, numChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final ConnectionManager connManager = createDummyConnectionManager();
@@ -335,7 +335,7 @@ public class InputGateFairnessTest {
partitions[i].onBuffer(buffer, sequenceNumbers[i]++);
}
}
-
+
// ------------------------------------------------------------------------
private static class FairnessVerifyingInputGate extends SingleInputGate {
@@ -352,11 +352,11 @@ public class InputGateFairnessTest {
IntermediateDataSetID consumedResultId,
int consumedSubpartitionIndex,
int numberOfInputChannels,
- PartitionStateChecker partitionStateChecker,
+ PartitionProducerStateChecker partitionProducerStateChecker,
IOMetricGroup metrics) {
super(owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
- numberOfInputChannels, partitionStateChecker, metrics);
+ numberOfInputChannels, partitionProducerStateChecker, metrics);
try {
Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5d0a106..411f344 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -284,7 +284,7 @@ public class LocalInputChannelTest {
new IntermediateDataSetID(),
0,
1,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()
);
@@ -481,7 +481,7 @@ public class LocalInputChannelTest {
new IntermediateDataSetID(),
subpartitionIndex,
numberOfInputChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
// Set buffer pool
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index ec4b31d..f2fb2d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -74,7 +74,7 @@ public class SingleInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final SingleInputGate inputGate = new SingleInputGate(
- "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final TestInputChannel[] inputChannels = new TestInputChannel[]{
new TestInputChannel(inputGate, 0),
@@ -128,7 +128,7 @@ public class SingleInputGateTest {
// Setup reader with one local and one unknown input channel
final IntermediateDataSetID resultId = new IntermediateDataSetID();
- final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
@@ -183,7 +183,7 @@ public class SingleInputGateTest {
new IntermediateDataSetID(),
0,
1,
- mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
@@ -223,7 +223,7 @@ public class SingleInputGateTest {
new IntermediateDataSetID(),
0,
1,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
InputChannel unknown = new UnknownInputChannel(
@@ -314,7 +314,7 @@ public class SingleInputGateTest {
NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
when(netEnv.getPartitionManager()).thenReturn(new ResultPartitionManager());
when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
- when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class));
+ when(netEnv.getPartitionProducerStateChecker()).thenReturn(mock(PartitionProducerStateChecker.class));
when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new Tuple2<>(initialBackoff, maxBackoff));
when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 0749467..b1d9483 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -59,7 +59,7 @@ public class TestSingleInputGate {
new IntermediateDataSetID(),
0,
numberOfInputChannels,
- mock(PartitionStateChecker.class),
+ mock(PartitionProducerStateChecker.class),
new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
this.inputGate = spy(realGate);
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index faec77e..8cfda0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.junit.Test;
@@ -43,8 +43,8 @@ public class UnionInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final String testTaskName = "Test Task";
- final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
- final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b56bf29..e41982e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -48,12 +48,12 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.messages.TaskMessages.PartitionProducerState;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -89,7 +89,6 @@ import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.
import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -178,7 +177,7 @@ public class JobManagerTest extends TestLogger {
// Request the execution graph to get the runtime info
jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
- final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class)
+ final ExecutionGraph eg = (ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
.executionGraph();
final ExecutionVertex vertex = eg.getJobVertex(sender.getID())
@@ -193,59 +192,236 @@ public class JobManagerTest extends TestLogger {
// - The test ----------------------------------------------------------------------
+ ExecutionAttemptID receiverId = new ExecutionAttemptID();
+
// 1. All execution states
- RequestPartitionState request = new RequestPartitionState(
- jid, partitionId, receiver, rid);
+ RequestPartitionProducerState request = new RequestPartitionProducerState(
+ jid, receiverId, rid, partitionId);
for (ExecutionState state : ExecutionState.values()) {
ExecutionGraphTestUtils.setVertexState(vertex, state);
- jobManagerGateway.tell(request, testActorGateway);
+ Future<?> futurePartitionState = jobManagerGateway
+ .ask(request, getRemainingTime());
- LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+ LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+ PartitionProducerState resp = (PartitionProducerState) (PartitionProducerState) wrappedMsg.message();
+ assertEquals(receiverId, resp.receiverExecutionId());
+ assertTrue("Responded with failure: " + resp, resp.result().isLeft());
+ assertEquals(state, resp.result().left().get()._3());
+ }
- assertEquals(PartitionState.class, lsm.message().getClass());
+ // 2. Non-existing execution
+ request = new RequestPartitionProducerState(jid, receiverId, rid, new ResultPartitionID());
- PartitionState resp = (PartitionState) lsm.message();
+ Future<?> futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+ LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+ PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message();
+ assertEquals(receiverId, resp.receiverExecutionId());
+ assertTrue("Responded with success: " + resp, resp.result().isRight());
+ assertTrue(resp.result().right().get() instanceof RuntimeException);
+ assertTrue(resp.result().right().get().getCause() instanceof IllegalArgumentException);
- assertEquals(request.taskExecutionId(), resp.taskExecutionId());
- assertEquals(request.taskResultId(), resp.taskResultId());
- assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
- assertEquals(state, resp.state());
+ // 3. Non-existing job
+ request = new RequestPartitionProducerState(new JobID(), receiverId, rid, new ResultPartitionID());
+ futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+ wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+ resp = (PartitionProducerState) wrappedMsg.message();
+ assertEquals(receiverId, resp.receiverExecutionId());
+ assertTrue("Responded with success: " + resp, resp.result().isRight());
+ assertTrue(resp.result().right().get() instanceof IllegalArgumentException);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
}
+ }
+ }
+ };
+ }};
+ }
- // 2. Non-existing execution
- request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
+ /**
+ * Tests the JobManager response when the execution is not registered with
+ * the ExecutionGraph.
+ */
+ @Test
+ public void testRequestPartitionStateUnregisteredExecution() throws Exception {
+ new JavaTestKit(system) {{
+ new Within(duration("15 seconds")) {
+ @Override
+ protected void run() {
+ // Setup
+ TestingCluster cluster = null;
+
+ try {
+ cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+ final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+ // Create a task
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setParallelism(1);
+ sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+ sender.createAndAddResultDataSet(rid, PIPELINED);
- jobManagerGateway.tell(request, testActorGateway);
+ final JobVertex sender2 = new JobVertex("Blocking Sender");
+ sender2.setParallelism(1);
+ sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
- LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+ final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+ final JobID jid = jobGraph.getJobID();
- assertEquals(PartitionState.class, lsm.message().getClass());
+ final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+ TestingUtils.TESTING_DURATION());
- PartitionState resp = (PartitionState) lsm.message();
+ // we can set the leader session ID to None because we don't use this gateway to send messages
+ final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
- assertEquals(request.taskExecutionId(), resp.taskExecutionId());
- assertEquals(request.taskResultId(), resp.taskResultId());
- assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
- assertNull(resp.state());
+ // Submit the job and wait for all vertices to be running
+ jobManagerGateway.tell(
+ new SubmitJob(
+ jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT),
+ testActorGateway);
+ expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
- // 3. Non-existing job
- request = new RequestPartitionState(
- new JobID(), new ResultPartitionID(), receiver, rid);
+ jobManagerGateway.tell(
+ new WaitForAllVerticesToBeRunningOrFinished(jid),
+ testActorGateway);
+
+ expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+ Future<Object> egFuture = jobManagerGateway.ask(
+ new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+ ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+ ExecutionGraph eg = egFound.executionGraph();
+
+ ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+ while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+ Thread.sleep(1);
+ }
+
+ IntermediateResultPartition partition = vertex.getProducedPartitions()
+ .values().iterator().next();
+
+ ResultPartitionID partitionId = new ResultPartitionID(
+ partition.getPartitionId(),
+ vertex.getCurrentExecutionAttempt().getAttemptId());
+
+ // Producer finished, request state
+ ExecutionAttemptID receiverId = new ExecutionAttemptID();
+
+ Future<?> producerStateFuture = jobManagerGateway.ask(
+ new RequestPartitionProducerState(jid, receiverId, rid, partitionId), getRemainingTime());
+
+ LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime());
+ PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message();
+ assertEquals(receiverId, resp.receiverExecutionId());
+ assertTrue("Responded with failure: " + resp, resp.result().isLeft());
+ assertEquals(ExecutionState.FINISHED, resp.result().left().get()._3());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+ };
+ }};
+ }
+
+ /**
+ * Tests the JobManager response when the execution is not registered with
+ * the ExecutionGraph anymore and a new execution attempt is available.
+ */
+ @Test
+ public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception {
+ new JavaTestKit(system) {{
+ new Within(duration("15 seconds")) {
+ @Override
+ protected void run() {
+ // Setup
+ TestingCluster cluster = null;
+
+ try {
+ cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+ final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+ // Create a task
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setParallelism(1);
+ sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+ sender.createAndAddResultDataSet(rid, PIPELINED);
+
+ final JobVertex sender2 = new JobVertex("Blocking Sender");
+ sender2.setParallelism(1);
+ sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
- jobManagerGateway.tell(request, testActorGateway);
+ final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+ final JobID jid = jobGraph.getJobID();
+
+ final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+ TestingUtils.TESTING_DURATION());
+
+ // we can set the leader session ID to None because we don't use this gateway to send messages
+ final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+ // Submit the job and wait for all vertices to be running
+ jobManagerGateway.tell(
+ new SubmitJob(
+ jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT),
+ testActorGateway);
+ expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+ jobManagerGateway.tell(
+ new WaitForAllVerticesToBeRunningOrFinished(jid),
+ testActorGateway);
+
+ expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+ Future<Object> egFuture = jobManagerGateway.ask(
+ new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+ ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+ ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph();
+
+ ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+ while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+ Thread.sleep(1);
+ }
+
+ IntermediateResultPartition partition = vertex.getProducedPartitions()
+ .values().iterator().next();
+
+ ResultPartitionID partitionId = new ResultPartitionID(
+ partition.getPartitionId(),
+ vertex.getCurrentExecutionAttempt().getAttemptId());
- lsm = expectMsgClass(LeaderSessionMessage.class);
+ // Reset execution => new execution attempt
+ vertex.resetForNewExecution();
- assertEquals(PartitionState.class, lsm.message().getClass());
+ // Producer finished, request state
+ ExecutionAttemptID receiverId = new ExecutionAttemptID();
- resp = (PartitionState) lsm.message();
+ Object request = new RequestPartitionProducerState(jid, receiverId, rid, partitionId);
- assertEquals(request.taskExecutionId(), resp.taskExecutionId());
- assertEquals(request.taskResultId(), resp.taskResultId());
- assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
- assertNull(resp.state());
+ Future<?> producerStateFuture = jobManagerGateway.ask(request, getRemainingTime());
+
+ LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime());
+ PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message();
+ assertEquals(receiverId, resp.receiverExecutionId());
+ assertTrue("Responded with success: " + resp, resp.result().isRight());
+ assertTrue(resp.result().right().get() instanceof PartitionProducerDisposedException);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -366,11 +542,7 @@ public class JobManagerTest extends TestLogger {
}
/**
- system.dispatcher(),
- actorSystem.dispatcher(),
- * Tests that we can trigger a
- *
- * @throws Exception
+ * Tests that we can trigger a savepoint when periodic checkpointing is disabled.
*/
@Test
public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 6a696a0..87fb24c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
@@ -63,7 +64,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
import org.apache.flink.runtime.messages.TaskMessages.StopTask;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
@@ -81,10 +81,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
+import scala.Tuple3;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
+import scala.util.Left;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -100,7 +102,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -1586,14 +1587,15 @@ public class TaskManagerTest extends TestLogger {
@Override
public void handleMessage(Object message) throws Exception {
- if (message instanceof RequestPartitionState) {
- final RequestPartitionState msg = (RequestPartitionState) message;
-
- PartitionState resp = new PartitionState(
- msg.taskExecutionId(),
- msg.taskResultId(),
- msg.partitionId().getPartitionId(),
- ExecutionState.RUNNING);
+ if (message instanceof JobManagerMessages.RequestPartitionProducerState) {
+ JobManagerMessages.RequestPartitionProducerState msg = (JobManagerMessages.RequestPartitionProducerState) message;
+
+ TaskMessages.PartitionProducerState resp = new TaskMessages.PartitionProducerState(
+ msg.receiverExecutionId(),
+ new Left<Tuple3<IntermediateDataSetID, ResultPartitionID, ExecutionState>, Exception>(new Tuple3<>(
+ msg.intermediateDataSetId(),
+ msg.resultPartitionId(),
+ ExecutionState.RUNNING)));
getSender().tell(decorateMessage(resp), getSelf());
}