You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/07/15 01:12:05 UTC
hive git commit: HIVE-16926: LlapTaskUmbilicalExternalClient should
not start new umbilical server for every fragment request (Jason Dere,
reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master c27764c0d -> c713eeec0
HIVE-16926: LlapTaskUmbilicalExternalClient should not start new umbilical server for every fragment request (Jason Dere, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c713eeec
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c713eeec
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c713eeec
Branch: refs/heads/master
Commit: c713eeec0d5f66aae8180fed8ebada8ce7b5d487
Parents: c27764c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Jul 14 18:11:06 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri Jul 14 18:11:06 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/LlapBaseRecordReader.java | 89 ++--
.../apache/hadoop/hive/llap/SubmitWorkInfo.java | 16 +-
.../ext/LlapTaskUmbilicalExternalClient.java | 487 ++++++++++---------
.../helpers/LlapTaskUmbilicalServer.java | 44 +-
.../hadoop/hive/llap/LlapBaseInputFormat.java | 2 -
.../ql/udf/generic/GenericUDTFGetSplits.java | 20 +-
6 files changed, 367 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index 7fff147..cb38839 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -58,6 +58,7 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
protected final long timeout;
protected final Closeable client;
private final Closeable socket;
+ private boolean closed = false;
public LlapBaseRecordReader(InputStream in, Schema schema,
Class<V> clazz, JobConf job, Closeable client, Closeable socket) {
@@ -78,27 +79,31 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
}
@Override
- public void close() throws IOException {
- Exception caughtException = null;
- try {
- din.close();
- } catch (Exception err) {
- LOG.error("Error closing input stream:" + err.getMessage(), err);
- caughtException = err;
- }
- // Don't close the socket - the stream already does that if needed.
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ closed = true;
- if (client != null) {
+ Exception caughtException = null;
try {
- client.close();
+ din.close();
} catch (Exception err) {
- LOG.error("Error closing client:" + err.getMessage(), err);
- caughtException = (caughtException == null ? err : caughtException);
+ LOG.error("Error closing input stream:" + err.getMessage(), err);
+ caughtException = err;
+ }
+ // Don't close the socket - the stream already does that if needed.
+
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception err) {
+ LOG.error("Error closing client:" + err.getMessage(), err);
+ caughtException = (caughtException == null ? err : caughtException);
+ }
}
- }
- if (caughtException != null) {
- throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException);
+ if (caughtException != null) {
+ throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException);
+ }
}
}
@@ -156,28 +161,40 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
return false;
}
} catch (IOException io) {
- if (Thread.interrupted()) {
- // Either we were interrupted by one of:
- // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
- // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
- // Either way we should not try to block trying to read the reader events queue.
- if (readerEvents.isEmpty()) {
- // Case 2.
- throw io;
- } else {
- // Case 1. Fail the reader, sending back the error we received from the reader event.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case ERROR:
- throw new IOException("Received reader event error: " + event.getMessage(), io);
- default:
- throw new IOException("Got reader event type " + event.getEventType()
- + ", expected error event", io);
+ try {
+ if (Thread.interrupted()) {
+ // Either we were interrupted by one of:
+ // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
+ // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+ // Either way we should not try to block trying to read the reader events queue.
+ if (readerEvents.isEmpty()) {
+ // Case 2.
+ throw io;
+ } else {
+ // Case 1. Fail the reader, sending back the error we received from the reader event.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case ERROR:
+ throw new IOException("Received reader event error: " + event.getMessage(), io);
+ default:
+ throw new IOException("Got reader event type " + event.getEventType()
+ + ", expected error event", io);
+ }
}
+ } else {
+ // If we weren't interrupted, just propagate the error
+ throw io;
+ }
+ } finally {
+ // The external client handling umbilical responses and the connection to read the incoming
+ // data are not coupled. Calling close() here to make sure an error in one will cause the
+ // other to be closed as well.
+ try {
+ close();
+ } catch (Exception err) {
+ // Don't propagate errors from close() since this will lose the original error above.
+ LOG.error("Closing RecordReader due to error and hit another error during close()", err);
}
- } else {
- // If we weren't interrupted, just propagate the error
- throw io;
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
index 95b0ffc..3ae37dc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -41,9 +41,10 @@ public class SubmitWorkInfo implements Writable {
private int vertexParallelism;
public SubmitWorkInfo(ApplicationId fakeAppId, long creationTime,
- int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature) {
+ int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature,
+ Token<JobTokenIdentifier> token) {
this.fakeAppId = fakeAppId;
- this.token = createJobToken();
+ this.token = token;
this.creationTime = creationTime;
this.vertexSpec = vertexSpec;
this.vertexSpecSignature = vertexSpecSignature;
@@ -126,17 +127,6 @@ public class SubmitWorkInfo implements Writable {
return submitWorkInfo;
}
-
- private Token<JobTokenIdentifier> createJobToken() {
- String tokenIdentifier = fakeAppId.toString();
- JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
- tokenIdentifier));
- Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
- new JobTokenSecretManager());
- sessionToken.setService(identifier.getJobId());
- return sessionToken;
- }
-
public byte[] getVertexBinary() {
return vertexSpec;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 406bdda..4304b52 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.llap.ext;
import org.apache.hadoop.io.Writable;
import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
@@ -59,7 +61,6 @@ import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -71,26 +72,62 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable {
+public class LlapTaskUmbilicalExternalClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
+ private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1);
+
private final LlapProtocolClientProxy communicator;
private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
private final Configuration conf;
- private final LlapTaskUmbilicalProtocol umbilical;
protected final String tokenIdentifier;
protected final Token<JobTokenIdentifier> sessionToken;
-
- private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
- private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
private LlapTaskUmbilicalExternalResponder responder = null;
- private final ScheduledThreadPoolExecutor timer;
private final long connectionTimeout;
private volatile boolean closed = false;
+ private RequestInfo requestInfo;
+ List<TezEvent> tezEvents;
+
+ // Using a shared instance of the umbilical server.
+ private static class SharedUmbilicalServer {
+ LlapTaskUmbilicalExternalImpl umbilicalProtocol;
+ LlapTaskUmbilicalServer llapTaskUmbilicalServer;
+
+ private volatile static SharedUmbilicalServer instance;
+ private static final Object lock = new Object();
+
+ static SharedUmbilicalServer getInstance(Configuration conf) {
+ SharedUmbilicalServer value = instance;
+ if (value == null) {
+ synchronized (lock) {
+ if (instance == null) {
+ instance = new SharedUmbilicalServer(conf);
+ }
+ value = instance;
+ }
+ }
+ return value;
+ }
- private static class TaskHeartbeatInfo {
+ private SharedUmbilicalServer(Configuration conf) {
+ try {
+ umbilicalProtocol = new LlapTaskUmbilicalExternalImpl(conf);
+ llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilicalProtocol, 1);
+ } catch (Exception err) {
+ throw new ExceptionInInitializerError(err);
+ }
+ }
+ }
+
+ private enum RequestState {
+ PENDING, RUNNING
+ };
+
+ private static class RequestInfo {
+ RequestState state;
+ final SubmitWorkRequestProto request;
final QueryIdentifierProto queryIdentifierProto;
final String taskAttemptId;
final String hostname;
@@ -98,7 +135,10 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
final int port;
final AtomicLong lastHeartbeat = new AtomicLong();
- public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskAttemptId, String hostname, int port) {
+ public RequestInfo(SubmitWorkRequestProto request, QueryIdentifierProto queryIdentifierProto,
+ String taskAttemptId, String hostname, int port) {
+ this.state = RequestState.PENDING;
+ this.request = request;
this.queryIdentifierProto = queryIdentifierProto;
this.taskAttemptId = taskAttemptId;
this.hostname = hostname;
@@ -107,26 +147,13 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
}
- private static class PendingEventData {
- final TaskHeartbeatInfo heartbeatInfo;
- final List<TezEvent> tezEvents;
-
- public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
- this.heartbeatInfo = heartbeatInfo;
- this.tezEvents = tezEvents;
- }
- }
-
public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder,
Token<LlapTokenIdentifier> llapToken) {
- super(LlapTaskUmbilicalExternalClient.class.getName());
this.conf = conf;
- this.umbilical = new LlapTaskUmbilicalExternalImpl();
this.tokenIdentifier = tokenIdentifier;
this.sessionToken = sessionToken;
this.responder = responder;
- this.timer = new ScheduledThreadPoolExecutor(1);
this.connectionTimeout = 3 * HiveConf.getTimeVar(conf,
HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// Add support for configurable threads, however 1 should always be enough.
@@ -134,37 +161,18 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
this.communicator.init(conf);
}
- @Override
- public void serviceStart() throws IOException {
- // If we use a single server for multiple external clients, then consider using more than one handler.
- int numHandlers = 1;
- llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
- communicator.start();
- }
-
- @Override
- public void serviceStop() throws Exception {
- if (closed) {
- throw new IllegalStateException("Client has already been closed");
- }
- closed = true;
-
- // Check if the request is registered - if so we can cancel the request
- for (Map.Entry<String, TaskHeartbeatInfo> taskEntry : registeredTasks.entrySet()) {
- terminateRequest(taskEntry.getValue());
+ private void terminateRequest() {
+ if (closed || requestInfo == null) {
+ LOG.warn("No current request to terminate");
+ return;
}
- registeredTasks.clear();
- scheduleClientForCleanup(this);
- }
-
- private void terminateRequest(TaskHeartbeatInfo thi) {
TerminateFragmentRequestProto.Builder builder = TerminateFragmentRequestProto.newBuilder();
- builder.setQueryIdentifier(thi.queryIdentifierProto);
- builder.setFragmentIdentifierString(thi.taskAttemptId);
+ builder.setQueryIdentifier(requestInfo.queryIdentifierProto);
+ builder.setFragmentIdentifierString(requestInfo.taskAttemptId);
- final String taskAttemptId = thi.taskAttemptId;
- communicator.sendTerminateFragment(builder.build(), thi.hostname, thi.port,
+ final String taskAttemptId = requestInfo.taskAttemptId;
+ communicator.sendTerminateFragment(builder.build(), requestInfo.hostname, requestInfo.port,
new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
@Override
@@ -181,16 +189,8 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
});
}
- private void doShutdown() throws IOException {
- llapTaskUmbilicalServer.shutdownServer();
- timer.shutdown();
- if (this.communicator != null) {
- this.communicator.stop();
- }
- }
-
public InetSocketAddress getAddress() {
- return llapTaskUmbilicalServer.getAddress();
+ return SharedUmbilicalServer.getInstance(conf).llapTaskUmbilicalServer.getAddress();
}
@@ -213,151 +213,137 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
final String fragmentId = attemptId.toString();
- final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort);
- pendingEvents.putIfAbsent(
- fragmentId, new PendingEventData(thi, Lists.<TezEvent>newArrayList()));
+ this.requestInfo = new RequestInfo(request, queryIdentifierProto, fragmentId, llapHost, llapPort);
- // Setup timer task to check for hearbeat timeouts
- timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
- connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
+ this.tezEvents = Lists.<TezEvent>newArrayList();
+ registerClient();
// Send out the actual SubmitWorkRequest
- communicator.sendSubmitWork(request, llapHost, llapPort,
- new LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() {
-
- @Override
- public void setResponse(SubmitWorkResponseProto response) {
- if (response.hasSubmissionState()) {
- if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) {
- String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
- LOG.info(msg);
- if (responder != null) {
- Throwable err = new RuntimeException(msg);
- responder.submissionFailed(fragmentId, err);
- }
- return;
- }
- }
- if (response.hasUniqueNodeId()) {
- thi.uniqueNodeId = response.getUniqueNodeId();
- }
- }
+ final LlapTaskUmbilicalExternalClient client = this;
+ communicator.start();
+ submitWork();
+ }
- @Override
- public void indicateError(Throwable t) {
- String msg = "Failed to submit: " + fragmentId;
- LOG.error(msg, t);
- Throwable err = new RuntimeException(msg, t);
- responder.submissionFailed(fragmentId, err);
- }
- });
+ private void submitWork() {
+ if (!closed) {
+ communicator.sendSubmitWork(requestInfo.request,
+ requestInfo.hostname, requestInfo.port, new SubmitWorkCallback(this));
+ }
}
- private void updateHeartbeatInfo(String taskAttemptId) {
- int updateCount = 0;
+ // Helper class to submit fragments to LLAP and retry rejected submissions.
+ static class SubmitWorkCallback implements LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto> {
+ private LlapTaskUmbilicalExternalClient client;
- PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
- if (pendingEventData != null) {
- pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
+ public SubmitWorkCallback(LlapTaskUmbilicalExternalClient client) {
+ this.client = client;
}
- TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
- if (heartbeatInfo != null) {
- heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
+ @Override
+ public void setResponse(SubmitWorkResponseProto response) {
+ if (response.hasSubmissionState()) {
+ if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) {
+ String fragmentId = this.client.requestInfo.taskAttemptId;
+ String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
+ LOG.info(msg);
+
+ // Retry rejected requests
+ if (!client.closed) {
+ // Update lastHeartbeat so we don't timeout during the retry
+ client.setLastHeartbeat(System.currentTimeMillis());
+ long retryDelay = HiveConf.getTimeVar(client.conf,
+ HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+ TimeUnit.MILLISECONDS);
+ LOG.info("Queueing fragment for resubmission: " + fragmentId);
+ final SubmitWorkCallback submitter = this;
+ retryExecutor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ client.submitWork();
+ }
+ },
+ retryDelay, TimeUnit.MILLISECONDS);
+ }
+ return;
+ }
+ }
+ if (response.hasUniqueNodeId()) {
+ client.requestInfo.uniqueNodeId = response.getUniqueNodeId();
+ }
}
- if (updateCount == 0) {
- LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+ @Override
+ public void indicateError(Throwable t) {
+ String fragmentId = this.client.requestInfo.taskAttemptId;
+ String msg = "Failed to submit: " + fragmentId;
+ LOG.error(msg, t);
+ Throwable err = new RuntimeException(msg, t);
+ client.unregisterClient();
+ client.responder.submissionFailed(fragmentId, err);
}
}
- private void updateHeartbeatInfo(
- String hostname, String uniqueId, int port, TezAttemptArray tasks) {
- int updateCount = 0;
- HashSet<TezTaskAttemptID> attempts = new HashSet<>();
- for (Writable w : tasks.get()) {
- attempts.add((TezTaskAttemptID)w);
+ @Override
+ public void close() {
+ if (!closed) {
+ terminateRequest();
+ unregisterClient();
}
+ }
- String error = "";
- for (String key : pendingEvents.keySet()) {
- PendingEventData pendingEventData = pendingEvents.get(key);
- if (pendingEventData != null) {
- TaskHeartbeatInfo thi = pendingEventData.heartbeatInfo;
- String thiUniqueId = thi.uniqueNodeId;
- if (thi.hostname.equals(hostname) && thi.port == port
- && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
- TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
- if (attempts.contains(ta)) {
- thi.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
- } else {
- error += (thi.taskAttemptId + ", ");
- }
- }
- }
+ private void registerClient() {
+ SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf);
+ LlapTaskUmbilicalExternalClient prevVal =
+ umbilicalServer.umbilicalProtocol.registeredClients.putIfAbsent(requestInfo.taskAttemptId, this);
+ if (prevVal != null) {
+ LOG.warn("Unexpected - fragment " + requestInfo.taskAttemptId + " is already registered!");
}
+ umbilicalServer.llapTaskUmbilicalServer.addTokenForJob(tokenIdentifier, sessionToken);
+ }
- for (String key : registeredTasks.keySet()) {
- TaskHeartbeatInfo thi = registeredTasks.get(key);
- if (thi != null) {
- String thiUniqueId = thi.uniqueNodeId;
- if (thi.hostname.equals(hostname) && thi.port == port
- && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
- TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
- if (attempts.contains(ta)) {
- thi.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
- } else {
- error += (thi.taskAttemptId + ", ");
- }
- }
- }
- }
- if (!error.isEmpty()) {
- LOG.info("The tasks we expected to be on the node are not there: " + error);
+ private void unregisterClient() {
+ if (!closed && requestInfo != null) {
+ communicator.stop();
+ SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf);
+ umbilicalServer.umbilicalProtocol.unregisterClient(requestInfo.taskAttemptId);
+ umbilicalServer.llapTaskUmbilicalServer.removeTokenForJob(tokenIdentifier);
+ closed = true;
}
+ }
- if (updateCount == 0) {
- LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
- }
+ long getLastHeartbeat() {
+ return this.requestInfo.lastHeartbeat.get();
+ }
+
+ void setLastHeartbeat(long lastHeartbeat) {
+ this.requestInfo.lastHeartbeat.set(lastHeartbeat);
}
- private class HeartbeatCheckTask implements Runnable {
+ // Periodic task to time out submitted tasks that have not been updated with umbilical heartbeat.
+ private static class HeartbeatCheckTask implements Runnable {
+ LlapTaskUmbilicalExternalImpl umbilicalImpl;
+
+ public HeartbeatCheckTask(LlapTaskUmbilicalExternalImpl umbilicalImpl) {
+ this.umbilicalImpl = umbilicalImpl;
+ }
+
public void run() {
long currentTime = System.currentTimeMillis();
- List<String> timedOutTasks = new ArrayList<String>();
-
- // Check both pending and registered tasks for timeouts
- for (String key : pendingEvents.keySet()) {
- PendingEventData pendingEventData = pendingEvents.get(key);
- if (pendingEventData != null) {
- if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
- timedOutTasks.add(key);
- }
- }
- }
- for (String timedOutTask : timedOutTasks) {
- LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
- responder.heartbeatTimeout(timedOutTask);
- pendingEvents.remove(timedOutTask);
- }
+ List<LlapTaskUmbilicalExternalClient> timedOutTasks = new ArrayList<LlapTaskUmbilicalExternalClient>();
- timedOutTasks.clear();
- for (String key : registeredTasks.keySet()) {
- TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
- if (heartbeatInfo != null) {
- if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
- timedOutTasks.add(key);
- }
+ for (Map.Entry<String, LlapTaskUmbilicalExternalClient> entry : umbilicalImpl.registeredClients.entrySet()) {
+ LlapTaskUmbilicalExternalClient client = entry.getValue();
+ if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) {
+ timedOutTasks.add(client);
}
}
- for (String timedOutTask : timedOutTasks) {
- LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
- responder.heartbeatTimeout(timedOutTask);
- registeredTasks.remove(timedOutTask);
+
+ for (LlapTaskUmbilicalExternalClient timedOutTask : timedOutTasks) {
+ String taskAttemptId = timedOutTask.requestInfo.taskAttemptId;
+ LOG.info("Running taskAttemptId " + taskAttemptId + " timed out");
+ timedOutTask.unregisterClient();
+ timedOutTask.responder.heartbeatTimeout(taskAttemptId);
}
}
}
@@ -369,10 +355,19 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
void heartbeatTimeout(String fragmentId);
}
+ private static class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
+ final ConcurrentMap<String, LlapTaskUmbilicalExternalClient> registeredClients = new ConcurrentHashMap<>();
+ private final ScheduledThreadPoolExecutor timer;
- // Ideally, the server should be shared across all client sessions running on the same node.
- private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
+ public LlapTaskUmbilicalExternalImpl(Configuration conf) {
+ long taskInterval = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ // Setup timer task to check for hearbeat timeouts
+ this.timer = new ScheduledThreadPoolExecutor(1);
+ timer.scheduleAtFixedRate(new HeartbeatCheckTask(this),
+ taskInterval, taskInterval, TimeUnit.MILLISECONDS);
+ }
@Override
public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
@@ -399,33 +394,26 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
// Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
String taskAttemptIdString = taskAttemptId.toString();
-
- if (closed) {
- LOG.info("Client has already been closed, but received heartbeat from " + taskAttemptIdString);
- // Set shouldDie response so the LLAP daemon closes this umbilical connection.
- response.setShouldDie();
- return response;
- }
-
updateHeartbeatInfo(taskAttemptIdString);
List<TezEvent> tezEvents = null;
- PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
- if (pendingEventData == null) {
- tezEvents = Collections.emptyList();
+ LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString);
+ if (client == null) {
+ // Heartbeat is from a task that we are not currently tracking.
+ LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
+ response.setShouldDie(); // Do any of the other fields need to be set?
+ return response;
+ }
- // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
- if (!registeredTasks.containsKey(taskAttemptIdString)) {
- LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
- response.setShouldDie(); // Do any of the other fields need to be set?
- return response;
- }
+ if (client.requestInfo.state == RequestState.PENDING) {
+ client.requestInfo.state = RequestState.RUNNING;
+ tezEvents = client.tezEvents;
} else {
- tezEvents = pendingEventData.tezEvents;
- // Tasks removed from the pending list should then be added to the registered list.
- registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
+ tezEvents = Collections.emptyList();
}
+ boolean shouldUnregisterClient = false;
+
response.setLastRequestId(request.getRequestId());
// Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
// Also since we have all the MRInput events here - they'll all be sent in together.
@@ -443,11 +431,11 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
switch (eventType) {
case TASK_ATTEMPT_COMPLETED_EVENT:
LOG.debug("Task completed event for " + taskAttemptIdString);
- registeredTasks.remove(taskAttemptIdString);
+ shouldUnregisterClient = true;
break;
case TASK_ATTEMPT_FAILED_EVENT:
LOG.debug("Task failed event for " + taskAttemptIdString);
- registeredTasks.remove(taskAttemptIdString);
+ shouldUnregisterClient = true;
break;
case TASK_STATUS_UPDATE_EVENT:
// If we want to handle counters
@@ -459,10 +447,14 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
}
+ if (shouldUnregisterClient) {
+ client.unregisterClient();
+ }
+
// Pass the request on to the responder
try {
- if (responder != null) {
- responder.heartbeat(request);
+ if (client.responder != null) {
+ client.responder.heartbeat(request);
}
} catch (Exception err) {
LOG.error("Error during responder execution", err);
@@ -474,6 +466,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
@Override
public void nodeHeartbeat(
Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node heartbeat from " + hostname + ":" + port + ", " + uniqueId);
+ }
updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw);
// No need to propagate to this to the responder
}
@@ -482,14 +477,18 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
String taskAttemptIdString = taskAttemptId.toString();
LOG.error("Task killed - " + taskAttemptIdString);
- registeredTasks.remove(taskAttemptIdString);
-
- try {
- if (responder != null) {
- responder.taskKilled(taskAttemptId);
+ LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString);
+ if (client != null) {
+ try {
+ client.unregisterClient();
+ if (client.responder != null) {
+ client.responder.taskKilled(taskAttemptId);
+ }
+ } catch (Exception err) {
+ LOG.error("Error during responder execution", err);
}
- } catch (Exception err) {
- LOG.error("Error during responder execution", err);
+ } else {
+ LOG.info("Received task killed notification for task which is not currently being tracked: " + taskAttemptId);
}
}
@@ -504,38 +503,60 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
return ProtocolSignature.getProtocolSignature(this, protocol,
clientVersion, clientMethodsHash);
}
- }
- private static void scheduleClientForCleanup(LlapTaskUmbilicalExternalClient client) {
- // Add a bit of delay in case the daemon has not closed the umbilical connection yet.
- clientCleanupExecuter.schedule(new ClientCleanupTask(client), cleanupDelay, TimeUnit.MILLISECONDS);
- }
+ private void unregisterClient(String taskAttemptId) {
+ registeredClients.remove(taskAttemptId);
+ }
- static final ScheduledThreadPoolExecutor clientCleanupExecuter = new ScheduledThreadPoolExecutor(1);
- static final int cleanupDelay = 2000;
+ private void updateHeartbeatInfo(String taskAttemptId) {
+ int updateCount = 0;
- static class ClientCleanupTask implements Runnable {
- final LlapTaskUmbilicalExternalClient client;
+ LlapTaskUmbilicalExternalClient registeredClient = registeredClients.get(taskAttemptId);
+ if (registeredClient != null) {
+ registeredClient.setLastHeartbeat(System.currentTimeMillis());
+ updateCount++;
+ }
- public ClientCleanupTask(LlapTaskUmbilicalExternalClient client) {
- this.client = client;
+ if (updateCount == 0) {
+ LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+ }
}
- @Override
- public void run() {
- if (client.llapTaskUmbilicalServer.getNumOpenConnections() == 0) {
- // No more outstanding connections, ok to close.
- try {
- LOG.debug("Closing client");
- client.doShutdown();
- } catch (Exception err) {
- LOG.error("Error cleaning up client", err);
+ private void updateHeartbeatInfo(
+ String hostname, String uniqueId, int port, TezAttemptArray tasks) {
+ int updateCount = 0;
+ HashSet<TezTaskAttemptID> attempts = new HashSet<>();
+ for (Writable w : tasks.get()) {
+ attempts.add((TezTaskAttemptID)w);
+ }
+
+ String error = "";
+ for (Map.Entry<String, LlapTaskUmbilicalExternalClient> entry : registeredClients.entrySet()) {
+ LlapTaskUmbilicalExternalClient registeredClient = entry.getValue();
+ if (doesClientMatchHeartbeat(registeredClient, hostname, uniqueId, port)) {
+ TezTaskAttemptID ta = TezTaskAttemptID.fromString(registeredClient.requestInfo.taskAttemptId);
+ if (attempts.contains(ta)) {
+ registeredClient.setLastHeartbeat(System.currentTimeMillis());
+ updateCount++;
+ } else {
+ error += (registeredClient.requestInfo.taskAttemptId + ", ");
+ }
}
- } else {
- // Reschedule this task for later.
- LOG.debug("Client still has umbilical connection - rescheduling cleanup.");
- scheduleClientForCleanup(client);
}
+ if (!error.isEmpty()) {
+ LOG.info("The tasks we expected to be on the node are not there: " + error);
+ }
+
+ if (updateCount == 0) {
+ LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
+ }
+ }
+
+ private static boolean doesClientMatchHeartbeat(LlapTaskUmbilicalExternalClient client,
+ String hostname, String uniqueId, int port) {
+ return (hostname.equals(client.requestInfo.hostname)
+ && port == client.requestInfo.port
+ && uniqueId.equals(client.requestInfo.uniqueNodeId));
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
index 403381d..89cb6fb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -18,10 +18,15 @@ package org.apache.hadoop.hive.llap.tezplugins.helpers;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
@@ -32,6 +37,7 @@ import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,12 +48,11 @@ public class LlapTaskUmbilicalServer {
protected volatile Server server;
private final InetSocketAddress address;
private final AtomicBoolean started = new AtomicBoolean(true);
+ private JobTokenSecretManager jobTokenSecretManager;
+ private Map<String, int[]> tokenRefMap = new HashMap<String, int[]>();
- public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws
- IOException {
- JobTokenSecretManager jobTokenSecretManager =
- new JobTokenSecretManager();
- jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
+ public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers) throws IOException {
+ jobTokenSecretManager = new JobTokenSecretManager();
server = new RPC.Builder(conf)
.setProtocol(LlapTaskUmbilicalProtocol.class)
@@ -65,7 +70,7 @@ public class LlapTaskUmbilicalServer {
this.address = NetUtils.getConnectAddress(server);
LOG.info(
"Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address +
- " with numHandlers=" + numHandlers);
+ " with numHandlers=" + numHandlers);
}
public InetSocketAddress getAddress() {
@@ -76,6 +81,33 @@ public class LlapTaskUmbilicalServer {
return server.getNumOpenConnections();
}
+ public synchronized void addTokenForJob(String tokenIdentifier, Token<JobTokenIdentifier> token) {
+ // Maintain count of outstanding requests for tokenIdentifier.
+ int[] refCount = tokenRefMap.get(tokenIdentifier);
+ if (refCount == null) {
+ refCount = new int[] { 0 };
+ tokenRefMap.put(tokenIdentifier, refCount);
+ // Should only need to insert the token the first time.
+ jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
+ }
+ refCount[0]++;
+ }
+
+ public synchronized void removeTokenForJob(String tokenIdentifier) {
+ // Maintain count of outstanding requests for tokenIdentifier.
+ // If count goes to 0, it is safe to remove the token.
+ int[] refCount = tokenRefMap.get(tokenIdentifier);
+ if (refCount == null) {
+ LOG.warn("No refCount found for tokenIdentifier " + tokenIdentifier);
+ } else {
+ refCount[0]--;
+ if (refCount[0] <= 0) {
+ tokenRefMap.remove(tokenIdentifier);
+ jobTokenSecretManager.removeTokenForJob(tokenIdentifier);
+ }
+ }
+ }
+
public void shutdownServer() {
if (started.get()) { // Primarily to avoid multiple shutdowns.
started.set(false);
http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index eb93241..201f5fa 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -147,8 +147,6 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
LlapTaskUmbilicalExternalClient llapClient =
new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
submitWorkInfo.getToken(), umbilicalResponder, llapToken);
- llapClient.init(job);
- llapClient.start();
int attemptNum = 0;
// Use task attempt number from conf if provided
http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index d4ec44e..5003f42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.FieldDesc;
@@ -90,6 +91,8 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -368,6 +371,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
queryUser = UserGroupInformation.getCurrentUser().getUserName();
}
+ // Generate umbilical token (applies to all splits)
+ Token<JobTokenIdentifier> umbilicalToken = JobTokenCreator.createJobToken(applicationId);
+
LOG.info("Number of splits: " + (eventList.size() - 1));
SignedMessage signedSvs = null;
for (int i = 0; i < eventList.size() - 1; i++) {
@@ -388,7 +394,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
System.currentTimeMillis(), taskSpec.getVertexParallelism(), signedSvs.message,
- signedSvs.signature);
+ signedSvs.signature, umbilicalToken);
byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
// 3. Generate input event.
@@ -406,6 +412,18 @@ public class GenericUDTFGetSplits extends GenericUDTF {
}
}
+ private static class JobTokenCreator {
+ private static Token<JobTokenIdentifier> createJobToken(ApplicationId applicationId) {
+ String tokenIdentifier = applicationId.toString();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+ tokenIdentifier));
+ Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+ new JobTokenSecretManager());
+ sessionToken.setService(identifier.getJobId());
+ return sessionToken;
+ }
+ }
+
private SplitLocationInfo[] makeLocationHints(TaskLocationHint hint) {
Set<String> hosts = hint.getHosts();
if (hosts.size() != 1) {