You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/01 09:24:55 UTC

[3/4] flink git commit: [runtime] Fix uncaught exception in Netty server pipeline

[runtime] Fix uncaught exception in Netty server pipeline


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61821801
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61821801
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61821801

Branch: refs/heads/master
Commit: 61821801878ed7bcebbcc98405ba5b13be1cbc2a
Parents: f75c16b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu May 28 15:57:39 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 1 09:21:56 2015 +0200

----------------------------------------------------------------------
 .../io/network/netty/PartitionRequestQueue.java  | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61821801/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index d53a883..5301195 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -242,14 +242,19 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 		@Override
 		public void operationComplete(ChannelFuture future) throws Exception {
-			if (future.isSuccess()) {
-				writeAndFlushNextMessageIfPossible(future.channel());
-			}
-			else if (future.cause() != null) {
-				handleException(future.channel(), future.cause());
+			try {
+				if (future.isSuccess()) {
+					writeAndFlushNextMessageIfPossible(future.channel());
+				}
+				else if (future.cause() != null) {
+					handleException(future.channel(), future.cause());
+				}
+				else {
+					handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
+				}
 			}
-			else {
-				handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
+			catch (Throwable t) {
+				handleException(future.channel(), t);
 			}
 		}
 	}