You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/11/07 18:11:43 UTC

hive git commit: HIVE-17908: LLAP External client not correctly handling killTask for pending requests (Jason Dere, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master a3e872822 -> 76485de6c


HIVE-17908: LLAP External client not correctly handling killTask for pending requests (Jason Dere, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/76485de6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/76485de6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/76485de6

Branch: refs/heads/master
Commit: 76485de6cd381a9ae9e05ebced612704b1f2b3e4
Parents: a3e8728
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Nov 7 10:10:06 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Nov 7 10:10:06 2017 -0800

----------------------------------------------------------------------
 .../ext/LlapTaskUmbilicalExternalClient.java    | 79 +++++++++++++++-----
 1 file changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/76485de6/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index aa94e54..d97b156 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -78,6 +79,7 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
 
   private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1);
 
+  private final Random rand = new Random();
   private final LlapProtocolClientProxy communicator;
   private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
   private final Configuration conf;
@@ -86,7 +88,10 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
   protected final Token<JobTokenIdentifier> sessionToken;
   private LlapTaskUmbilicalExternalResponder responder = null;
   private final long connectionTimeout;
+  private long baseDelay;
+  private int attemptNum = 0;
   private volatile boolean closed = false;
+  private volatile boolean timeoutsDisabled = false;
   private RequestInfo requestInfo;
   List<TezEvent> tezEvents;
 
@@ -156,6 +161,9 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
     this.responder = responder;
     this.connectionTimeout = 3 * HiveConf.getTimeVar(conf,
         HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    this.baseDelay = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+        TimeUnit.MILLISECONDS);
     // Add support for configurable threads, however 1 should always be enough.
     this.communicator = new LlapProtocolClientProxy(1, conf, llapToken);
     this.communicator.init(conf);
@@ -231,6 +239,27 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
     }
   }
 
+  private void retrySubmission() {
+    attemptNum++;
+
+    // Don't retry immediately - use delay with exponential backoff
+    long retryDelay = determineRetryDelay();
+    LOG.info("Queueing fragment for resubmission {}, attempt {}, delay {}",
+        this.requestInfo.taskAttemptId, attemptNum, retryDelay);
+    disableTimeouts();  // Don't timeout because of retry delay
+    retryExecutor.schedule(
+        new Runnable() {
+          @Override
+          public void run() {
+            // Re-enable timeouts
+            enableTimeouts();
+            submitWork();
+          }
+        },
+        retryDelay,
+        TimeUnit.MILLISECONDS);
+  }
+
   // Helper class to submit fragments to LLAP and retry rejected submissions.
   static class SubmitWorkCallback implements LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto> {
     private LlapTaskUmbilicalExternalClient client;
@@ -247,24 +276,9 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
           String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
           LOG.info(msg);
 
-          // Retry rejected requests
-          if (!client.closed) {
-            // Update lastHeartbeat so we don't timeout during the retry
-            client.setLastHeartbeat(System.currentTimeMillis());
-            long retryDelay = HiveConf.getTimeVar(client.conf,
-                HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
-                TimeUnit.MILLISECONDS);
-            LOG.info("Queueing fragment for resubmission: " + fragmentId);
-            final SubmitWorkCallback submitter = this;
-            retryExecutor.schedule(
-                new Runnable() {
-                  @Override
-                  public void run() {
-                    client.submitWork();
-                  }
-                },
-                retryDelay, TimeUnit.MILLISECONDS);
-          }
+          // taskKill() should also be received during a rejected submission,
+          // we will let that logic handle retries.
+
           return;
         }
       }
@@ -320,6 +334,29 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
     this.requestInfo.lastHeartbeat.set(lastHeartbeat);
   }
 
+  private boolean isTimedOut(long currentTime) {
+    if (timeoutsDisabled) {
+      return false;
+    }
+    return (currentTime - getLastHeartbeat() >= connectionTimeout);
+  }
+
+  private void enableTimeouts() {
+    setLastHeartbeat(System.currentTimeMillis());
+    timeoutsDisabled = false;
+  }
+
+  private void disableTimeouts() {
+    timeoutsDisabled = true;
+  }
+
+  private long determineRetryDelay() {
+    // Delay with exponential backoff
+    int maxDelay = (int) Math.min(baseDelay * Math.pow(2, attemptNum), 60000);
+    long retryDelay = rand.nextInt(maxDelay);
+    return retryDelay;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -346,7 +383,7 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
 
       for (Map.Entry<String, LlapTaskUmbilicalExternalClient> entry : umbilicalImpl.registeredClients.entrySet()) {
         LlapTaskUmbilicalExternalClient client = entry.getValue();
-        if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) {
+        if (client.isTimedOut(currentTime)) {
           timedOutTasks.add(client);
         }
       }
@@ -491,7 +528,9 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
       LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString);
       if (client != null) {
         if (client.requestInfo.state == RequestState.PENDING) {
-          LOG.debug("Ignoring task kill for {}, request is still in pending state", taskAttemptIdString);
+          // A task kill while the request is still in PENDING state means the request should be retried.
+          LOG.info("Received task kill for {} which is still in pending state. Retry submission.", taskAttemptIdString);
+          client.retrySubmission();
         } else {
           try {
             LOG.error("Task killed - " + taskAttemptIdString);