You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:30:50 UTC
[05/12] drill git commit: DRILL-3005: Ensure that only one outcome
can occur for a QueryResult.
DRILL-3005: Ensure that only one outcome can occur for a QueryResult.
Includes removing QueryResultHandler close listener which should fix most times. For exceptional race cases, also added isTerminal condition which ensures only a single transition.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fb5e455c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fb5e455c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fb5e455c
Branch: refs/heads/merge_2015_05_09
Commit: fb5e455c77b720bb2f350acf5cd535ffd612c671
Parents: a392e53
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat May 9 10:44:45 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:24:21 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/rpc/user/QueryResultHandler.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/fb5e455c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 302be72..143d104 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
@@ -279,6 +280,7 @@ public class QueryResultHandler {
private final RemoteConnection connection;
private final ChannelFuture closeFuture;
private final ChannelClosedListener closeListener;
+ private final AtomicBoolean isTerminal = new AtomicBoolean(false);
public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) {
super();
@@ -302,11 +304,22 @@ public class QueryResultHandler {
@Override
public void failed(RpcException ex) {
+ if (!isTerminal.compareAndSet(false, true)) {
+ return;
+ }
+
+ closeFuture.removeListener(closeListener);
resultsListener.submissionFailed(UserException.systemError(ex).build());
+
}
@Override
public void success(QueryId queryId, ByteBuf buf) {
+ if (!isTerminal.compareAndSet(false, true)) {
+ return;
+ }
+
+ closeFuture.removeListener(closeListener);
resultsListener.queryIdArrived(queryId);
if (logger.isDebugEnabled()) {
logger.debug("Received QueryId {} successfully. Adding results listener {}.",