You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 16:14:08 UTC
[1/2] flink git commit: [FLINK-5274] [network] Handle reader release
in LocalInputChannel
Repository: flink
Updated Branches:
refs/heads/master 684defbf3 -> 4410c04a6
[FLINK-5274] [network] Handle reader release in LocalInputChannel
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/555a6879
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/555a6879
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/555a6879
Branch: refs/heads/master
Commit: 555a68793ecb3a244dfbf50a615c6d5e15c9efe4
Parents: 684defb
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:39:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:13:59 2016 +0100
----------------------------------------------------------------------
.../partition/consumer/LocalInputChannel.java | 13 ++++++
.../consumer/LocalInputChannelTest.java | 47 ++++++++++++++++++++
2 files changed, 60 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/555a6879/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index c9e0179..4e14e93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -179,6 +180,18 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
}
Buffer next = subpartitionView.getNextBuffer();
+
+ if (next == null) {
+ if (subpartitionView.isReleased()) {
+ throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
+ } else {
+ // This means there is a bug in the buffer availability
+ // notifications.
+ throw new IllegalStateException("Consumed partition has no buffers available. " +
+ "Number of received buffer notifications is " + numBuffersAvailable + ".");
+ }
+ }
+
long remaining = numBuffersAvailable.decrementAndGet();
if (remaining >= 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/555a6879/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 83da3b1..35ed4c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -347,6 +347,53 @@ public class LocalInputChannelTest {
requester.join();
}
+ /**
+ * Tests that reading from a channel when after the partition has been
+ * released are handled and don't lead to NPEs.
+ */
+ @Test
+ public void testGetNextAfterPartitionReleased() throws Exception {
+ ResultSubpartitionView reader = mock(ResultSubpartitionView.class);
+ SingleInputGate gate = mock(SingleInputGate.class);
+ ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+
+ when(partitionManager.createSubpartitionView(
+ any(ResultPartitionID.class),
+ anyInt(),
+ any(BufferProvider.class),
+ any(BufferAvailabilityListener.class))).thenReturn(reader);
+
+ LocalInputChannel channel = new LocalInputChannel(
+ gate,
+ 0,
+ new ResultPartitionID(),
+ partitionManager,
+ new TaskEventDispatcher(),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+ channel.requestSubpartition(0);
+
+ // Null buffer but not released
+ when(reader.getNextBuffer()).thenReturn(null);
+ when(reader.isReleased()).thenReturn(false);
+
+ try {
+ channel.getNextBuffer();
+ fail("Did not throw expected IllegalStateException");
+ } catch (IllegalStateException ignored) {
+ }
+
+ // Null buffer and released
+ when(reader.getNextBuffer()).thenReturn(null);
+ when(reader.isReleased()).thenReturn(true);
+
+ try {
+ channel.getNextBuffer();
+ fail("Did not throw expected CancelTaskException");
+ } catch (CancelTaskException ignored) {
+ }
+ }
+
// ---------------------------------------------------------------------------------------------
private LocalInputChannel createLocalInputChannel(
[2/2] flink git commit: [FLINK-5275] [execgraph] Give more detailed
error message if InputChannel deployment fails
Posted by uc...@apache.org.
[FLINK-5275] [execgraph] Give more detailed error message if InputChannel deployment fails
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4410c04a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4410c04a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4410c04a
Branch: refs/heads/master
Commit: 4410c04a68c7b247bb3d7113e5f40f2a9c2165af
Parents: 555a687
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:48:25 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:14:00 2016 +0100
----------------------------------------------------------------------
.../InputChannelDeploymentDescriptor.java | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4410c04a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 9b3ce5f..9bf3bd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -131,8 +131,21 @@ public class InputChannelDeploymentDescriptor implements Serializable {
else if (allowLazyDeployment) {
// The producing task might not have registered the partition yet
partitionLocation = ResultPartitionLocation.createUnknown();
- } else {
- throw new ExecutionGraphException("Trying to eagerly schedule a task whose inputs are not ready.");
+ }
+ else if (producerState == ExecutionState.CANCELING
+ || producerState == ExecutionState.CANCELED
+ || producerState == ExecutionState.FAILED) {
+ String msg = "Trying to schedule a task whose inputs were canceled or failed. " +
+ "The producer is in state " + producerState + ".";
+ throw new ExecutionGraphException(msg);
+ }
+ else {
+ String msg = String.format("Trying to eagerly schedule a task whose inputs " +
+ "are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
+ consumedPartition.isConsumable(),
+ producerState,
+ producerSlot);
+ throw new ExecutionGraphException(msg);
}
final ResultPartitionID consumedPartitionId = new ResultPartitionID(