You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/06/25 01:32:27 UTC
[5/6] drill git commit: DRILL-5599: Notify StatusHandler that batch
sending has failed even if channel is still open
DRILL-5599: Notify StatusHandler that batch sending has failed even if channel is still open
close #857
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7e6571aa
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7e6571aa
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7e6571aa
Branch: refs/heads/master
Commit: 7e6571aa5d4c58185dbfa131de99354ea7dc6b4e
Parents: dd55b5c
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Tue Jun 20 12:18:27 2017 +0300
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat Jun 24 09:42:34 2017 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/rpc/RequestIdMap.java | 21 +++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7e6571aa/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
index a9c3012..804834c 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -47,7 +47,7 @@ class RequestIdMap {
private final IntObjectHashMap<RpcOutcome<?>> map;
public RequestIdMap() {
- map = new IntObjectHashMap<RpcOutcome<?>>();
+ map = new IntObjectHashMap<>();
}
void channelClosed(Throwable ex) {
@@ -82,7 +82,7 @@ class RequestIdMap {
public <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz,
RemoteConnection connection) {
final int i = lastCoordinationId.incrementAndGet();
- final RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
+ final RpcListener<V> future = new RpcListener<>(handler, clazz, i, connection);
final Object old;
synchronized (map) {
Preconditions.checkArgument(isOpen.get(),
@@ -111,13 +111,16 @@ class RequestIdMap {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
-
if (!future.isSuccess()) {
- removeFromMap(coordinationId);
- if (future.channel().isActive()) {
- throw new RpcException("Future failed");
- } else {
- setException(new ChannelClosedException());
+ try {
+ removeFromMap(coordinationId);
+ } finally {
+ final Throwable cause = future.cause();
+ if (future.channel().isActive()) {
+ setException(cause == null ? new RpcException("Unknown ChannelFuture operation failure") : cause);
+ } else {
+ setException(cause == null ? new ChannelClosedException() : new ChannelClosedException(cause));
+ }
}
}
}