You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/23 16:18:47 UTC

[flink] 07/08: [FLINK-6227][network] Introduce PartitionException to indicate restarting producer on JM side

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e72b3419ea1bcc59c3e93c878bb49de7f0c503c
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu May 16 15:35:26 2019 +0800

    [FLINK-6227][network] Introduce PartitionException to indicate restarting producer on JM side
    
    The new proposed PartitionException would cover all the cases of consuming partition failure which causes consumer failed, then JM decides to restart the producer based on this exception.
    
    [FLINK-6227][network] (part 2)Make current PartitionNotFoundException extend PartitionException
    
    This closes #8242.
---
 ...FoundException.java => PartitionException.java} | 19 +++++++++-------
 .../partition/PartitionNotFoundException.java      | 17 ++------------
 .../network/partition/consumer/InputChannel.java   |  4 ++++
 .../partition/consumer/SingleInputGateTest.java    | 26 ++++++++++++++++++++++
 4 files changed, 43 insertions(+), 23 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java
similarity index 68%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java
index 2f78816..75f1cf1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java
@@ -21,24 +21,27 @@ package org.apache.flink.runtime.io.network.partition;
 import java.io.IOException;
 
 /**
- * Exception for failed partition requests due to non-existing partitions.
+ * Exception for covering all the scenarios of consuming partition failure
+ * which causes the consumer task failed, and the job master would decide
+ * whether to restart the producer based on this exception.
  */
-public class PartitionNotFoundException extends IOException {
+public abstract class PartitionException extends IOException {
 
 	private static final long serialVersionUID = 0L;
 
 	private final ResultPartitionID partitionId;
 
-	public PartitionNotFoundException(ResultPartitionID partitionId) {
+	public PartitionException(String message, ResultPartitionID partitionId) {
+		this(message, partitionId, null);
+	}
+
+	public PartitionException(String message, ResultPartitionID partitionId, Throwable throwable) {
+		super(message, throwable);
+
 		this.partitionId = partitionId;
 	}
 
 	public ResultPartitionID getPartitionId() {
 		return partitionId;
 	}
-
-	@Override
-	public String getMessage() {
-		return "Partition " + partitionId + " not found.";
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
index 2f78816..c4e4bd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -18,27 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import java.io.IOException;
-
 /**
  * Exception for failed partition requests due to non-existing partitions.
  */
-public class PartitionNotFoundException extends IOException {
+public class PartitionNotFoundException extends PartitionException {
 
 	private static final long serialVersionUID = 0L;
 
-	private final ResultPartitionID partitionId;
-
 	public PartitionNotFoundException(ResultPartitionID partitionId) {
-		this.partitionId = partitionId;
-	}
-
-	public ResultPartitionID getPartitionId() {
-		return partitionId;
-	}
-
-	@Override
-	public String getMessage() {
-		return "Partition " + partitionId + " not found.";
+		super("Partition " + partitionId + " not found.", partitionId);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index a08ecc2..c0a204b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 
@@ -176,6 +177,9 @@ public abstract class InputChannel {
 
 	/**
 	 * Checks for an error and rethrows it if one was reported.
+	 *
+	 * <p>Note: Any {@link PartitionException} instances should not be transformed
+	 * and make sure they are always visible in task failure cause.
 	 */
 	protected void checkError() throws IOException {
 		final Throwable t = cause.get();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 93c3375..c159ce2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -52,6 +53,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -59,6 +62,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
@@ -520,6 +524,28 @@ public class SingleInputGateTest extends InputGateTestBase {
 		}
 	}
 
+	/**
+	 * Tests that if the {@link PartitionNotFoundException} is set onto one {@link InputChannel},
+	 * then it would be thrown directly via {@link SingleInputGate#getNextBufferOrEvent()}. So we
+	 * could confirm the {@link SingleInputGate} would not swallow or transform the original exception.
+	 */
+	@Test
+	public void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
+		final SingleInputGate inputGate = createSingleInputGate(1);
+		final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
+		final ResultPartitionID partitionId = localChannel.getPartitionId();
+
+		inputGate.setInputChannel(partitionId.getPartitionId(), localChannel);
+		localChannel.setError(new PartitionNotFoundException(partitionId));
+		try {
+			inputGate.getNextBufferOrEvent();
+
+			fail("Should throw a PartitionNotFoundException.");
+		} catch (PartitionNotFoundException notFound) {
+			assertThat(partitionId, is(notFound.getPartitionId()));
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	private void addUnknownInputChannel(