You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/11/07 18:11:43 UTC
hive git commit: HIVE-17908: LLAP External client not correctly
handling killTask for pending requests (Jason Dere,
reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master a3e872822 -> 76485de6c
HIVE-17908: LLAP External client not correctly handling killTask for pending requests (Jason Dere, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/76485de6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/76485de6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/76485de6
Branch: refs/heads/master
Commit: 76485de6cd381a9ae9e05ebced612704b1f2b3e4
Parents: a3e8728
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Nov 7 10:10:06 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Nov 7 10:10:06 2017 -0800
----------------------------------------------------------------------
.../ext/LlapTaskUmbilicalExternalClient.java | 79 +++++++++++++++-----
1 file changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/76485de6/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index aa94e54..d97b156 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -78,6 +79,7 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1);
+ private final Random rand = new Random();
private final LlapProtocolClientProxy communicator;
private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
private final Configuration conf;
@@ -86,7 +88,10 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
protected final Token<JobTokenIdentifier> sessionToken;
private LlapTaskUmbilicalExternalResponder responder = null;
private final long connectionTimeout;
+ private long baseDelay;
+ private int attemptNum = 0;
private volatile boolean closed = false;
+ private volatile boolean timeoutsDisabled = false;
private RequestInfo requestInfo;
List<TezEvent> tezEvents;
@@ -156,6 +161,9 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
this.responder = responder;
this.connectionTimeout = 3 * HiveConf.getTimeVar(conf,
HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ this.baseDelay = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+ TimeUnit.MILLISECONDS);
// Add support for configurable threads, however 1 should always be enough.
this.communicator = new LlapProtocolClientProxy(1, conf, llapToken);
this.communicator.init(conf);
@@ -231,6 +239,27 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
}
}
+ private void retrySubmission() {
+ attemptNum++;
+
+ // Don't retry immediately - use delay with exponential backoff
+ long retryDelay = determineRetryDelay();
+ LOG.info("Queueing fragment for resubmission {}, attempt {}, delay {}",
+ this.requestInfo.taskAttemptId, attemptNum, retryDelay);
+ disableTimeouts(); // Don't timeout because of retry delay
+ retryExecutor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ // Re-enable timeouts
+ enableTimeouts();
+ submitWork();
+ }
+ },
+ retryDelay,
+ TimeUnit.MILLISECONDS);
+ }
+
// Helper class to submit fragments to LLAP and retry rejected submissions.
static class SubmitWorkCallback implements LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto> {
private LlapTaskUmbilicalExternalClient client;
@@ -247,24 +276,9 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
LOG.info(msg);
- // Retry rejected requests
- if (!client.closed) {
- // Update lastHeartbeat so we don't timeout during the retry
- client.setLastHeartbeat(System.currentTimeMillis());
- long retryDelay = HiveConf.getTimeVar(client.conf,
- HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
- TimeUnit.MILLISECONDS);
- LOG.info("Queueing fragment for resubmission: " + fragmentId);
- final SubmitWorkCallback submitter = this;
- retryExecutor.schedule(
- new Runnable() {
- @Override
- public void run() {
- client.submitWork();
- }
- },
- retryDelay, TimeUnit.MILLISECONDS);
- }
+ // taskKill() should also be received during a rejected submission,
+ // we will let that logic handle retries.
+
return;
}
}
@@ -320,6 +334,29 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
this.requestInfo.lastHeartbeat.set(lastHeartbeat);
}
+ private boolean isTimedOut(long currentTime) {
+ if (timeoutsDisabled) {
+ return false;
+ }
+ return (currentTime - getLastHeartbeat() >= connectionTimeout);
+ }
+
+ private void enableTimeouts() {
+ setLastHeartbeat(System.currentTimeMillis());
+ timeoutsDisabled = false;
+ }
+
+ private void disableTimeouts() {
+ timeoutsDisabled = true;
+ }
+
+ private long determineRetryDelay() {
+ // Delay with exponential backoff
+ int maxDelay = (int) Math.min(baseDelay * Math.pow(2, attemptNum), 60000);
+ long retryDelay = rand.nextInt(maxDelay);
+ return retryDelay;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -346,7 +383,7 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
for (Map.Entry<String, LlapTaskUmbilicalExternalClient> entry : umbilicalImpl.registeredClients.entrySet()) {
LlapTaskUmbilicalExternalClient client = entry.getValue();
- if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) {
+ if (client.isTimedOut(currentTime)) {
timedOutTasks.add(client);
}
}
@@ -491,7 +528,9 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString);
if (client != null) {
if (client.requestInfo.state == RequestState.PENDING) {
- LOG.debug("Ignoring task kill for {}, request is still in pending state", taskAttemptIdString);
+ // A task kill while the request is still in PENDING state means the request should be retried.
+ LOG.info("Received task kill for {} which is still in pending state. Retry submission.", taskAttemptIdString);
+ client.retrySubmission();
} else {
try {
LOG.error("Task killed - " + taskAttemptIdString);