You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/23 16:18:48 UTC

[flink] 08/08: [FLINK-12458][network] Introduce PartitionConnectionException for unreachable producer

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49e1832056483d17f540a515cc5be7be1654dd48
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed May 22 14:45:46 2019 +0800

    [FLINK-12458][network] Introduce PartitionConnectionException for unreachable producer
    
    If the consumer can not establish a connection to remote task executor while requesting remote subpartition, which might indicate the remote task executor is not reachable.
    We could wrap this connection exception into new proposed PartitionConnectionException which also extends PartitionException, then the job master would decide whether to
    restart the upstream region to re-producer partition data.
    
    This closes #8509.
---
 .../consumer/PartitionConnectionException.java}    | 35 +++++++---------------
 .../partition/consumer/RemoteInputChannel.java     |  8 +++--
 .../io/network/TestingConnectionManager.java       |  4 ++-
 .../partition/consumer/RemoteInputChannelTest.java | 24 +++++++++++++++
 4 files changed, 43 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
similarity index 54%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
index 822314d..4713dfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
@@ -16,35 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network;
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 /**
- * A dummy implementation of the {@link ConnectionManager} which is mainly used for creating
- * {@link PartitionRequestClient} instance in tests.
+ * Exception for failed partition requests due to connection failure
+ * with unreachable producer.
  */
-public class TestingConnectionManager implements ConnectionManager {
-
-	@Override
-	public void start() {}
-
-	@Override
-	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
-		return new TestingPartitionRequestClient();
-	}
+public class PartitionConnectionException extends PartitionException {
 
-	@Override
-	public void closeOpenChannelConnections(ConnectionID connectionId) {}
+	private static final long serialVersionUID = 0L;
 
-	@Override
-	public int getNumberOfActiveConnections() {
-		return 0;
+	public PartitionConnectionException(ResultPartitionID partitionId, Throwable throwable) {
+		super("Connection for partition " + partitionId + " not reachable.", partitionId, throwable);
 	}
-
-	@Override
-	public int getDataPort() {
-		return -1;
-	}
-
-	@Override
-	public void shutdown() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index fabc495..2d174ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -161,8 +161,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 	public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
 		if (partitionRequestClient == null) {
 			// Create a client and request the partition
-			partitionRequestClient = connectionManager
-				.createPartitionRequestClient(connectionId);
+			try {
+				partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
+			} catch (IOException e) {
+				// IOExceptions indicate that we could not open a connection to the remote TaskExecutor
+				throw new PartitionConnectionException(partitionId, e);
+			}
 
 			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
index 822314d..c23b3c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network;
 
+import java.io.IOException;
+
 /**
  * A dummy implementation of the {@link ConnectionManager} which is mainly used for creating
  * {@link PartitionRequestClient} instance in tests.
@@ -28,7 +30,7 @@ public class TestingConnectionManager implements ConnectionManager {
 	public void start() {}
 
 	@Override
-	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
+	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
 		return new TestingPartitionRequestClient();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 24d256e..0fdebf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -1016,6 +1016,23 @@ public class RemoteInputChannelTest {
 		}
 	}
 
+	/**
+	 * Tests that any exceptions thrown by {@link ConnectionManager#createPartitionRequestClient(ConnectionID)}
+	 * would be wrapped into {@link PartitionConnectionException} during
+	 * {@link RemoteInputChannel#requestSubpartition(int)}.
+	 */
+	@Test
+	public void testPartitionConnectionExceptionWhileRequestingPartition() throws Exception {
+		final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(
+			createSingleInputGate(1), 0, new TestingExceptionConnectionManager());
+		try {
+			inputChannel.requestSubpartition(0);
+			fail("Expected PartitionConnectionException.");
+		} catch (PartitionConnectionException ex) {
+			assertThat(inputChannel.getPartitionId(), is(ex.getPartitionId()));
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
@@ -1179,4 +1196,11 @@ public class RemoteInputChannelTest {
 			ExceptionUtils.rethrowException(throwable);
 		}
 	}
+
+	private static final class TestingExceptionConnectionManager extends TestingConnectionManager {
+		@Override
+		public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
+			throw new IOException("");
+		}
+	}
 }