You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:30:50 UTC

[05/12] drill git commit: DRILL-3005: Ensure that only one outcome can occur for a QueryResult.

DRILL-3005: Ensure that only one outcome can occur for a QueryResult.

Includes removing QueryResultHandler close listener which should fix most times.  For exceptional race cases, also added isTerminal condition which ensures only a single transition.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fb5e455c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fb5e455c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fb5e455c

Branch: refs/heads/merge_2015_05_09
Commit: fb5e455c77b720bb2f350acf5cd535ffd612c671
Parents: a392e53
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat May 9 10:44:45 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:24:21 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/rpc/user/QueryResultHandler.java | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fb5e455c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 302be72..143d104 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
@@ -279,6 +280,7 @@ public class QueryResultHandler {
     private final RemoteConnection connection;
     private final ChannelFuture closeFuture;
     private final ChannelClosedListener closeListener;
+    private final AtomicBoolean isTerminal = new AtomicBoolean(false);
 
     public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) {
       super();
@@ -302,11 +304,22 @@ public class QueryResultHandler {
 
     @Override
     public void failed(RpcException ex) {
+      if (!isTerminal.compareAndSet(false, true)) {
+        return;
+      }
+
+      closeFuture.removeListener(closeListener);
       resultsListener.submissionFailed(UserException.systemError(ex).build());
+
     }
 
     @Override
     public void success(QueryId queryId, ByteBuf buf) {
+      if (!isTerminal.compareAndSet(false, true)) {
+        return;
+      }
+
+      closeFuture.removeListener(closeListener);
       resultsListener.queryIdArrived(queryId);
       if (logger.isDebugEnabled()) {
         logger.debug("Received QueryId {} successfully. Adding results listener {}.",