You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/14 14:48:52 UTC

[2/3] flink git commit: [FLINK-5553] [network] keep the original throwable in PartitionRequestClientHandler

[FLINK-5553] [network] keep the original throwable in PartitionRequestClientHandler

This way, when checking for a previous error in any input channel, we can throw
a meaningful exception instead of the inspecific
IllegalStateException("There has been an error in the channel.") before.

Note that the original throwable (from an existing channel) may or may not(!)
have been printed by the InputGate yet. Any new input channel, however, did not
get the Throwable and must fail through the (now enhanced) fallback mechanism.

This closes #3299


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af81bebd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af81bebd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af81bebd

Branch: refs/heads/master
Commit: af81bebd0fabc6390930689df131e72edab6995b
Parents: a91b6ff
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 13 16:30:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 .../netty/PartitionRequestClientHandler.java    | 27 +++++++++++++++-----
 .../netty/ClientTransportErrorHandlingTest.java |  3 ++-
 2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 52775d4..9f80abc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -42,18 +42,15 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
 	private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class);
 
 	private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
 
-	private final AtomicBoolean channelError = new AtomicBoolean(false);
+	private final AtomicReference<Throwable> channelError = new AtomicReference<Throwable>();
 
 	private final BufferListenerTask bufferListener = new BufferListenerTask();
 
@@ -73,8 +70,8 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	// Input channel/receiver registration
 	// ------------------------------------------------------------------------
 
-	void addInputChannel(RemoteInputChannel listener) {
-		checkState(!channelError.get(), "There has been an error in the channel.");
+	void addInputChannel(RemoteInputChannel listener) throws IOException {
+		checkError();
 
 		if (!inputChannels.containsKey(listener.getInputChannelId())) {
 			inputChannels.put(listener.getInputChannelId(), listener);
@@ -172,7 +169,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	}
 
 	private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
-		if (channelError.compareAndSet(false, true)) {
+		if (channelError.compareAndSet(null, cause)) {
 			try {
 				for (RemoteInputChannel inputChannel : inputChannels.values()) {
 					inputChannel.onError(cause);
@@ -195,6 +192,22 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Checks for an error and rethrows it if one was reported.
+	 */
+	private void checkError() throws IOException {
+		final Throwable t = channelError.get();
+
+		if (t != null) {
+			if (t instanceof IOException) {
+				throw (IOException) t;
+			}
+			else {
+				throw new IOException("There has been an error in the channel.", t);
+			}
+		}
+	}
+
 	@Override
 	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 		super.channelReadComplete(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index ab96d4a..22e7754 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -389,7 +389,8 @@ public class ClientTransportErrorHandlingTest {
 		return new EmbeddedChannel(protocol.getClientChannelHandlers());
 	}
 
-	private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler) {
+	private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler)
+		throws IOException {
 		RemoteInputChannel rich = createRemoteInputChannel();
 		clientHandler.addInputChannel(rich);