You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/01 09:24:55 UTC
[3/4] flink git commit: [runtime] Fix uncaught exception in Netty
server pipeline
[runtime] Fix uncaught exception in Netty server pipeline
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61821801
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61821801
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61821801
Branch: refs/heads/master
Commit: 61821801878ed7bcebbcc98405ba5b13be1cbc2a
Parents: f75c16b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu May 28 15:57:39 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 1 09:21:56 2015 +0200
----------------------------------------------------------------------
.../io/network/netty/PartitionRequestQueue.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61821801/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index d53a883..5301195 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -242,14 +242,19 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- writeAndFlushNextMessageIfPossible(future.channel());
- }
- else if (future.cause() != null) {
- handleException(future.channel(), future.cause());
+ try {
+ if (future.isSuccess()) {
+ writeAndFlushNextMessageIfPossible(future.channel());
+ }
+ else if (future.cause() != null) {
+ handleException(future.channel(), future.cause());
+ }
+ else {
+ handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
+ }
}
- else {
- handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
+ catch (Throwable t) {
+ handleException(future.channel(), t);
}
}
}