You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/06/25 01:32:27 UTC

[5/6] drill git commit: DRILL-5599: Notify StatusHandler that batch sending has failed even if channel is still open

DRILL-5599: Notify StatusHandler that batch sending has failed even if channel is still open

close #857


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7e6571aa
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7e6571aa
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7e6571aa

Branch: refs/heads/master
Commit: 7e6571aa5d4c58185dbfa131de99354ea7dc6b4e
Parents: dd55b5c
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Tue Jun 20 12:18:27 2017 +0300
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat Jun 24 09:42:34 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/rpc/RequestIdMap.java | 21 +++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7e6571aa/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
index a9c3012..804834c 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -47,7 +47,7 @@ class RequestIdMap {
   private final IntObjectHashMap<RpcOutcome<?>> map;
 
   public RequestIdMap() {
-    map = new IntObjectHashMap<RpcOutcome<?>>();
+    map = new IntObjectHashMap<>();
   }
 
   void channelClosed(Throwable ex) {
@@ -82,7 +82,7 @@ class RequestIdMap {
   public <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz,
       RemoteConnection connection) {
     final int i = lastCoordinationId.incrementAndGet();
-    final RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
+    final RpcListener<V> future = new RpcListener<>(handler, clazz, i, connection);
     final Object old;
     synchronized (map) {
       Preconditions.checkArgument(isOpen.get(),
@@ -111,13 +111,16 @@ class RequestIdMap {
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-
       if (!future.isSuccess()) {
-        removeFromMap(coordinationId);
-        if (future.channel().isActive()) {
-          throw new RpcException("Future failed");
-        } else {
-          setException(new ChannelClosedException());
+        try {
+          removeFromMap(coordinationId);
+        } finally {
+          final Throwable cause = future.cause();
+          if (future.channel().isActive()) {
+            setException(cause == null ? new RpcException("Unknown ChannelFuture operation failure") : cause);
+          } else {
+            setException(cause == null ? new ChannelClosedException() : new ChannelClosedException(cause));
+          }
         }
       }
     }