You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/14 06:42:09 UTC

[4/8] drill git commit: DRILL-3074: Fix infinite loop issue in ReconnectingConnection

DRILL-3074: Fix infinite loop issue in ReconnectingConnection

Also use the remaining timeout incase of interruption


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b1b75f6a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b1b75f6a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b1b75f6a

Branch: refs/heads/master
Commit: b1b75f6abc56ed1ba097e20bdce81e7992396180
Parents: 4ad4261
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 13 17:56:14 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 13 19:34:00 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/rpc/BasicClient.java     | 17 +++++++++--------
 .../drill/exec/rpc/ReconnectingConnection.java     | 17 ++++++++++-------
 2 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b1b75f6a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 34758ef..a33b370 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -196,16 +196,15 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
       public void operationComplete(ChannelFuture future) throws Exception {
         boolean isInterrupted = false;
 
-        final long timeoutMills = 30000;
-
-        // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+        // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
         // So there is no point propagating the interruption as failure immediately.
-        final long targetMillis = System.currentTimeMillis() + timeoutMills;
+        long remainingWaitTimeMills = 120000;
+        long startTime = System.currentTimeMillis();
 
         // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
         while(true) {
           try {
-            future.get(timeoutMills, TimeUnit.MILLISECONDS);
+            future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
             if (future.isSuccess()) {
               // send a handshake on the current thread. This is the only time we will send from within the event thread.
               // We can do this because the connection will not be backed up.
@@ -216,14 +215,16 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             // logger.debug("Handshake queued for send.");
             break;
           } catch (final InterruptedException interruptEx) {
-            // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+            remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
+            startTime = System.currentTimeMillis();
             isInterrupted = true;
-            final long wait = targetMillis - System.currentTimeMillis();
-            if (wait < 1) {
+            if (remainingWaitTimeMills < 1) {
               l.connectionFailed(FailureType.CONNECTION, interruptEx);
               break;
             }
+            // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
           } catch (final Exception ex) {
+            logger.error("Failed to establish connection", ex);
             l.connectionFailed(FailureType.CONNECTION, ex);
             break;
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/b1b75f6a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index 12e0063..d62b6f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -104,15 +104,15 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
     public void waitAndRun() {
       boolean isInterrupted = false;
 
-      final long timeoutMills = 30000;
-      // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+      // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
       // So there is no point propagating the interruption as failure immediately.
-      final long targetMillis = System.currentTimeMillis() + timeoutMills;
+      long remainingWaitTimeMills = 120000;
+      long startTime = System.currentTimeMillis();
 
       while(true) {
         try {
           //        logger.debug("Waiting for connection.");
-          CONNECTION_TYPE connection = this.get(timeoutMills, TimeUnit.MILLISECONDS);
+          CONNECTION_TYPE connection = this.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
 
           if (connection == null) {
             //          logger.debug("Connection failed.");
@@ -123,15 +123,18 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
           }
           break;
         } catch (final InterruptedException interruptEx) {
-          // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+          remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
+          startTime = System.currentTimeMillis();
           isInterrupted = true;
-          final long wait = targetMillis - System.currentTimeMillis();
-          if (wait < 1) {
+          if (remainingWaitTimeMills < 1) {
             cmd.connectionFailed(FailureType.CONNECTION, interruptEx);
             break;
           }
+          // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
         } catch (final ExecutionException | TimeoutException ex) {
+          logger.error("Failed to establish connection", ex);
           cmd.connectionFailed(FailureType.CONNECTION, ex);
+          break;
         }
       }