You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/07/15 01:12:05 UTC

hive git commit: HIVE-16926: LlapTaskUmbilicalExternalClient should not start new umbilical server for every fragment request (Jason Dere, reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master c27764c0d -> c713eeec0


HIVE-16926: LlapTaskUmbilicalExternalClient should not start new umbilical server for every fragment request (Jason Dere, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c713eeec
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c713eeec
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c713eeec

Branch: refs/heads/master
Commit: c713eeec0d5f66aae8180fed8ebada8ce7b5d487
Parents: c27764c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Jul 14 18:11:06 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri Jul 14 18:11:06 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/LlapBaseRecordReader.java  |  89 ++--
 .../apache/hadoop/hive/llap/SubmitWorkInfo.java |  16 +-
 .../ext/LlapTaskUmbilicalExternalClient.java    | 487 ++++++++++---------
 .../helpers/LlapTaskUmbilicalServer.java        |  44 +-
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |   2 -
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  20 +-
 6 files changed, 367 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index 7fff147..cb38839 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -58,6 +58,7 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
   protected final long timeout;
   protected final Closeable client;
   private final Closeable socket;
+  private boolean closed = false;
 
   public LlapBaseRecordReader(InputStream in, Schema schema,
       Class<V> clazz, JobConf job, Closeable client, Closeable socket) {
@@ -78,27 +79,31 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
   }
 
   @Override
-  public void close() throws IOException {
-    Exception caughtException = null;
-    try {
-      din.close();
-    } catch (Exception err) {
-      LOG.error("Error closing input stream:" + err.getMessage(), err);
-      caughtException = err;
-    }
-    // Don't close the socket - the stream already does that if needed.
+  public synchronized void close() throws IOException {
+    if (!closed) {
+      closed = true;
 
-    if (client != null) {
+      Exception caughtException = null;
       try {
-        client.close();
+        din.close();
       } catch (Exception err) {
-        LOG.error("Error closing client:" + err.getMessage(), err);
-        caughtException = (caughtException == null ? err : caughtException);
+        LOG.error("Error closing input stream:" + err.getMessage(), err);
+        caughtException = err;
+      }
+      // Don't close the socket - the stream already does that if needed.
+
+      if (client != null) {
+        try {
+          client.close();
+        } catch (Exception err) {
+          LOG.error("Error closing client:" + err.getMessage(), err);
+          caughtException = (caughtException == null ? err : caughtException);
+        }
       }
-    }
 
-    if (caughtException != null) {
-      throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException);
+      if (caughtException != null) {
+        throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException);
+      }
     }
   }
 
@@ -156,28 +161,40 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
         return false;
       }
     } catch (IOException io) {
-      if (Thread.interrupted()) {
-        // Either we were interrupted by one of:
-        // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
-        // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
-        // Either way we should not try to block trying to read the reader events queue.
-        if (readerEvents.isEmpty()) {
-          // Case 2.
-          throw io;
-        } else {
-          // Case 1. Fail the reader, sending back the error we received from the reader event.
-          ReaderEvent event = getReaderEvent();
-          switch (event.getEventType()) {
-            case ERROR:
-              throw new IOException("Received reader event error: " + event.getMessage(), io);
-            default:
-              throw new IOException("Got reader event type " + event.getEventType()
-                  + ", expected error event", io);
+      try {
+        if (Thread.interrupted()) {
+          // Either we were interrupted by one of:
+          // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
+          // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+          // Either way we should not try to block trying to read the reader events queue.
+          if (readerEvents.isEmpty()) {
+            // Case 2.
+            throw io;
+          } else {
+            // Case 1. Fail the reader, sending back the error we received from the reader event.
+            ReaderEvent event = getReaderEvent();
+            switch (event.getEventType()) {
+              case ERROR:
+                throw new IOException("Received reader event error: " + event.getMessage(), io);
+              default:
+                throw new IOException("Got reader event type " + event.getEventType()
+                    + ", expected error event", io);
+            }
           }
+        } else {
+          // If we weren't interrupted, just propagate the error
+          throw io;
+        }
+      } finally {
+        // The external client handling umbilical responses and the connection to read the incoming
+        // data are not coupled. Calling close() here to make sure an error in one will cause the
+        // other to be closed as well.
+        try {
+          close();
+        } catch (Exception err) {
+          // Don't propagate errors from close() since this will lose the original error above.
+          LOG.error("Closing RecordReader due to error and hit another error during close()", err);
         }
-      } else {
-        // If we weren't interrupted, just propagate the error
-        throw io;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
index 95b0ffc..3ae37dc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -41,9 +41,10 @@ public class SubmitWorkInfo implements Writable {
   private int vertexParallelism;
 
   public SubmitWorkInfo(ApplicationId fakeAppId, long creationTime,
-      int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature) {
+      int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature,
+      Token<JobTokenIdentifier> token) {
     this.fakeAppId = fakeAppId;
-    this.token = createJobToken();
+    this.token = token;
     this.creationTime = creationTime;
     this.vertexSpec = vertexSpec;
     this.vertexSpecSignature = vertexSpecSignature;
@@ -126,17 +127,6 @@ public class SubmitWorkInfo implements Writable {
     return submitWorkInfo;
   }
 
-
-  private Token<JobTokenIdentifier> createJobToken() {
-    String tokenIdentifier = fakeAppId.toString();
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
-        tokenIdentifier));
-    Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
-        new JobTokenSecretManager());
-    sessionToken.setService(identifier.getJobId());
-    return sessionToken;
-  }
-
   public byte[] getVertexBinary() {
     return vertexSpec;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 406bdda..4304b52 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.llap.ext;
 import org.apache.hadoop.io.Writable;
 
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
 
@@ -59,7 +61,6 @@ import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -71,26 +72,62 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable {
+public class LlapTaskUmbilicalExternalClient implements Closeable {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
 
+  private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1);
+
   private final LlapProtocolClientProxy communicator;
   private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
   private final Configuration conf;
-  private final LlapTaskUmbilicalProtocol umbilical;
 
   protected final String tokenIdentifier;
   protected final Token<JobTokenIdentifier> sessionToken;
-
-  private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
-  private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
   private LlapTaskUmbilicalExternalResponder responder = null;
-  private final ScheduledThreadPoolExecutor timer;
   private final long connectionTimeout;
   private volatile boolean closed = false;
+  private RequestInfo requestInfo;
+  List<TezEvent> tezEvents;
+
+  // Using a shared instance of the umbilical server.
+  private static class SharedUmbilicalServer {
+    LlapTaskUmbilicalExternalImpl umbilicalProtocol;
+    LlapTaskUmbilicalServer llapTaskUmbilicalServer;
+
+    private volatile static SharedUmbilicalServer instance;
+    private static final Object lock = new Object();
+
+    static SharedUmbilicalServer getInstance(Configuration conf) {
+      SharedUmbilicalServer value = instance;
+      if (value == null) {
+        synchronized (lock) {
+          if (instance == null) {
+            instance = new SharedUmbilicalServer(conf);
+          }
+          value = instance;
+        }
+      }
+      return value;
+    }
 
-  private static class TaskHeartbeatInfo {
+    private SharedUmbilicalServer(Configuration conf) {
+      try {
+        umbilicalProtocol = new LlapTaskUmbilicalExternalImpl(conf);
+        llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilicalProtocol, 1);
+      } catch (Exception err) {
+        throw new ExceptionInInitializerError(err);
+      }
+    }
+  }
+
+  private enum RequestState {
+    PENDING, RUNNING
+  };
+
+  private static class RequestInfo {
+    RequestState state;
+    final SubmitWorkRequestProto request;
     final QueryIdentifierProto queryIdentifierProto;
     final String taskAttemptId;
     final String hostname;
@@ -98,7 +135,10 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     final int port;
     final AtomicLong lastHeartbeat = new AtomicLong();
 
-    public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskAttemptId, String hostname, int port) {
+    public RequestInfo(SubmitWorkRequestProto request, QueryIdentifierProto queryIdentifierProto,
+        String taskAttemptId, String hostname, int port) {
+      this.state = RequestState.PENDING;
+      this.request = request;
       this.queryIdentifierProto = queryIdentifierProto;
       this.taskAttemptId = taskAttemptId;
       this.hostname = hostname;
@@ -107,26 +147,13 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     }
   }
 
-  private static class PendingEventData {
-    final TaskHeartbeatInfo heartbeatInfo;
-    final List<TezEvent> tezEvents;
-
-    public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
-      this.heartbeatInfo = heartbeatInfo;
-      this.tezEvents = tezEvents;
-    }
-  }
-
   public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
       Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder,
       Token<LlapTokenIdentifier> llapToken) {
-    super(LlapTaskUmbilicalExternalClient.class.getName());
     this.conf = conf;
-    this.umbilical = new LlapTaskUmbilicalExternalImpl();
     this.tokenIdentifier = tokenIdentifier;
     this.sessionToken = sessionToken;
     this.responder = responder;
-    this.timer = new ScheduledThreadPoolExecutor(1);
     this.connectionTimeout = 3 * HiveConf.getTimeVar(conf,
         HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
     // Add support for configurable threads, however 1 should always be enough.
@@ -134,37 +161,18 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     this.communicator.init(conf);
   }
 
-  @Override
-  public void serviceStart() throws IOException {
-    // If we use a single server for multiple external clients, then consider using more than one handler.
-    int numHandlers = 1;
-    llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
-    communicator.start();
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    if (closed) {
-      throw new IllegalStateException("Client has already been closed");
-    }
-    closed = true;
-
-    // Check if the request is registered - if so we can cancel the request
-    for (Map.Entry<String, TaskHeartbeatInfo> taskEntry : registeredTasks.entrySet()) {
-      terminateRequest(taskEntry.getValue());
+  private void terminateRequest() {
+    if (closed || requestInfo == null) {
+      LOG.warn("No current request to terminate");
+      return;
     }
-    registeredTasks.clear();
 
-    scheduleClientForCleanup(this);
-  }
-
-  private void terminateRequest(TaskHeartbeatInfo thi) {
     TerminateFragmentRequestProto.Builder builder = TerminateFragmentRequestProto.newBuilder();
-    builder.setQueryIdentifier(thi.queryIdentifierProto);
-    builder.setFragmentIdentifierString(thi.taskAttemptId);
+    builder.setQueryIdentifier(requestInfo.queryIdentifierProto);
+    builder.setFragmentIdentifierString(requestInfo.taskAttemptId);
 
-    final String taskAttemptId = thi.taskAttemptId;
-    communicator.sendTerminateFragment(builder.build(), thi.hostname, thi.port,
+    final String taskAttemptId = requestInfo.taskAttemptId;
+    communicator.sendTerminateFragment(builder.build(), requestInfo.hostname, requestInfo.port,
         new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
 
       @Override
@@ -181,16 +189,8 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     });
   }
 
-  private void doShutdown() throws IOException {
-    llapTaskUmbilicalServer.shutdownServer();
-    timer.shutdown();
-    if (this.communicator != null) {
-      this.communicator.stop();
-    }
-  }
-
   public InetSocketAddress getAddress() {
-    return llapTaskUmbilicalServer.getAddress();
+    return SharedUmbilicalServer.getInstance(conf).llapTaskUmbilicalServer.getAddress();
   }
 
 
@@ -213,151 +213,137 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
         vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
     final String fragmentId = attemptId.toString();
 
-    final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort);
-    pendingEvents.putIfAbsent(
-        fragmentId, new PendingEventData(thi, Lists.<TezEvent>newArrayList()));
+    this.requestInfo = new RequestInfo(request, queryIdentifierProto, fragmentId, llapHost, llapPort);
 
-    // Setup timer task to check for hearbeat timeouts
-    timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
-        connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
+    this.tezEvents = Lists.<TezEvent>newArrayList();
+    registerClient();
 
     // Send out the actual SubmitWorkRequest
-    communicator.sendSubmitWork(request, llapHost, llapPort,
-        new LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() {
-
-          @Override
-          public void setResponse(SubmitWorkResponseProto response) {
-            if (response.hasSubmissionState()) {
-              if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) {
-                String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
-                LOG.info(msg);
-                if (responder != null) {
-                  Throwable err = new RuntimeException(msg);
-                  responder.submissionFailed(fragmentId, err);
-                }
-                return;
-              }
-            }
-            if (response.hasUniqueNodeId()) {
-              thi.uniqueNodeId = response.getUniqueNodeId();
-            }
-          }
+    final LlapTaskUmbilicalExternalClient client = this;
+    communicator.start();
+    submitWork();
+  }
 
-          @Override
-          public void indicateError(Throwable t) {
-            String msg = "Failed to submit: " + fragmentId;
-            LOG.error(msg, t);
-            Throwable err = new RuntimeException(msg, t);
-            responder.submissionFailed(fragmentId, err);
-          }
-        });
+  private void submitWork() {
+    if (!closed) {
+      communicator.sendSubmitWork(requestInfo.request,
+          requestInfo.hostname, requestInfo.port, new SubmitWorkCallback(this));
+    }
   }
 
-  private void updateHeartbeatInfo(String taskAttemptId) {
-    int updateCount = 0;
+  // Helper class to submit fragments to LLAP and retry rejected submissions.
+  static class SubmitWorkCallback implements LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto> {
+    private LlapTaskUmbilicalExternalClient client;
 
-    PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
-    if (pendingEventData != null) {
-      pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
-      updateCount++;
+    public SubmitWorkCallback(LlapTaskUmbilicalExternalClient client) {
+      this.client = client;
     }
 
-    TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
-    if (heartbeatInfo != null) {
-      heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
-      updateCount++;
+    @Override
+    public void setResponse(SubmitWorkResponseProto response) {
+      if (response.hasSubmissionState()) {
+        if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) {
+          String fragmentId = this.client.requestInfo.taskAttemptId;
+          String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
+          LOG.info(msg);
+
+          // Retry rejected requests
+          if (!client.closed) {
+            // Update lastHeartbeat so we don't timeout during the retry
+            client.setLastHeartbeat(System.currentTimeMillis());
+            long retryDelay = HiveConf.getTimeVar(client.conf,
+                HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+                TimeUnit.MILLISECONDS);
+            LOG.info("Queueing fragment for resubmission: " + fragmentId);
+            final SubmitWorkCallback submitter = this;
+            retryExecutor.schedule(
+                new Runnable() {
+                  @Override
+                  public void run() {
+                    client.submitWork();
+                  }
+                },
+                retryDelay, TimeUnit.MILLISECONDS);
+          }
+          return;
+        }
+      }
+      if (response.hasUniqueNodeId()) {
+        client.requestInfo.uniqueNodeId = response.getUniqueNodeId();
+      }
     }
 
-    if (updateCount == 0) {
-      LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+    @Override
+    public void indicateError(Throwable t) {
+      String fragmentId = this.client.requestInfo.taskAttemptId;
+      String msg = "Failed to submit: " + fragmentId;
+      LOG.error(msg, t);
+      Throwable err = new RuntimeException(msg, t);
+      client.unregisterClient();
+      client.responder.submissionFailed(fragmentId, err);
     }
   }
 
-  private void updateHeartbeatInfo(
-      String hostname, String uniqueId, int port, TezAttemptArray tasks) {
-    int updateCount = 0;
-    HashSet<TezTaskAttemptID> attempts = new HashSet<>();
-    for (Writable w : tasks.get()) {
-      attempts.add((TezTaskAttemptID)w);
+  @Override
+  public void close() {
+    if (!closed) {
+      terminateRequest();
+      unregisterClient();
     }
+  }
 
-    String error = "";
-    for (String key : pendingEvents.keySet()) {
-      PendingEventData pendingEventData = pendingEvents.get(key);
-      if (pendingEventData != null) {
-        TaskHeartbeatInfo thi = pendingEventData.heartbeatInfo;
-        String thiUniqueId = thi.uniqueNodeId;
-        if (thi.hostname.equals(hostname) && thi.port == port
-            && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
-          TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
-          if (attempts.contains(ta)) {
-            thi.lastHeartbeat.set(System.currentTimeMillis());
-            updateCount++;
-          } else {
-            error += (thi.taskAttemptId + ", ");
-          }
-        }
-      }
+  private void registerClient() {
+    SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf);
+    LlapTaskUmbilicalExternalClient prevVal =
+        umbilicalServer.umbilicalProtocol.registeredClients.putIfAbsent(requestInfo.taskAttemptId, this);
+    if (prevVal != null) {
+      LOG.warn("Unexpected - fragment " + requestInfo.taskAttemptId + " is already registered!");
     }
+    umbilicalServer.llapTaskUmbilicalServer.addTokenForJob(tokenIdentifier, sessionToken);
+  }
 
-    for (String key : registeredTasks.keySet()) {
-      TaskHeartbeatInfo thi = registeredTasks.get(key);
-      if (thi != null) {
-        String thiUniqueId = thi.uniqueNodeId;
-        if (thi.hostname.equals(hostname) && thi.port == port
-            && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
-          TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
-          if (attempts.contains(ta)) {
-            thi.lastHeartbeat.set(System.currentTimeMillis());
-            updateCount++;
-          } else {
-            error += (thi.taskAttemptId + ", ");
-          }
-        }
-      }
-    }
-    if (!error.isEmpty()) {
-      LOG.info("The tasks we expected to be on the node are not there: " + error);
+  private void unregisterClient() {
+    if (!closed && requestInfo != null) {
+      communicator.stop();
+      SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf);
+      umbilicalServer.umbilicalProtocol.unregisterClient(requestInfo.taskAttemptId);
+      umbilicalServer.llapTaskUmbilicalServer.removeTokenForJob(tokenIdentifier);
+      closed = true;
     }
+  }
 
-    if (updateCount == 0) {
-      LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
-    }
+  long getLastHeartbeat() {
+    return this.requestInfo.lastHeartbeat.get();
+  }
+
+  void setLastHeartbeat(long lastHeartbeat) {
+    this.requestInfo.lastHeartbeat.set(lastHeartbeat);
   }
 
-  private class HeartbeatCheckTask implements Runnable {
+  // Periodic task to time out submitted tasks that have not been updated with umbilical heartbeat.
+  private static class HeartbeatCheckTask implements Runnable {
+    LlapTaskUmbilicalExternalImpl umbilicalImpl;
+
+    public HeartbeatCheckTask(LlapTaskUmbilicalExternalImpl umbilicalImpl) {
+      this.umbilicalImpl = umbilicalImpl;
+    }
+
     public void run() {
       long currentTime = System.currentTimeMillis();
-      List<String> timedOutTasks = new ArrayList<String>();
-
-      // Check both pending and registered tasks for timeouts
-      for (String key : pendingEvents.keySet()) {
-        PendingEventData pendingEventData = pendingEvents.get(key);
-        if (pendingEventData != null) {
-          if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
-            timedOutTasks.add(key);
-          }
-        }
-      }
-      for (String timedOutTask : timedOutTasks) {
-        LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
-        responder.heartbeatTimeout(timedOutTask);
-        pendingEvents.remove(timedOutTask);
-      }
+      List<LlapTaskUmbilicalExternalClient> timedOutTasks = new ArrayList<LlapTaskUmbilicalExternalClient>();
 
-      timedOutTasks.clear();
-      for (String key : registeredTasks.keySet()) {
-        TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
-        if (heartbeatInfo != null) {
-          if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
-            timedOutTasks.add(key);
-          }
+      for (Map.Entry<String, LlapTaskUmbilicalExternalClient> entry : umbilicalImpl.registeredClients.entrySet()) {
+        LlapTaskUmbilicalExternalClient client = entry.getValue();
+        if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) {
+          timedOutTasks.add(client);
         }
       }
-      for (String timedOutTask : timedOutTasks) {
-        LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
-        responder.heartbeatTimeout(timedOutTask);
-        registeredTasks.remove(timedOutTask);
+
+      for (LlapTaskUmbilicalExternalClient timedOutTask : timedOutTasks) {
+        String taskAttemptId = timedOutTask.requestInfo.taskAttemptId;
+        LOG.info("Running taskAttemptId " + taskAttemptId + " timed out");
+        timedOutTask.unregisterClient();
+        timedOutTask.responder.heartbeatTimeout(taskAttemptId);
       }
     }
   }
@@ -369,10 +355,19 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     void heartbeatTimeout(String fragmentId);
   }
 
+  private static class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
 
+    final ConcurrentMap<String, LlapTaskUmbilicalExternalClient> registeredClients = new ConcurrentHashMap<>();
+    private final ScheduledThreadPoolExecutor timer;
 
-  // Ideally, the server should be shared across all client sessions running on the same node.
-  private class LlapTaskUmbilicalExternalImpl implements  LlapTaskUmbilicalProtocol {
+    public LlapTaskUmbilicalExternalImpl(Configuration conf) {
+      long taskInterval = HiveConf.getTimeVar(conf,
+          HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      // Setup timer task to check for hearbeat timeouts
+      this.timer = new ScheduledThreadPoolExecutor(1);
+      timer.scheduleAtFixedRate(new HeartbeatCheckTask(this),
+          taskInterval, taskInterval, TimeUnit.MILLISECONDS);
+    }
 
     @Override
     public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
@@ -399,33 +394,26 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
       // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
       TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
       String taskAttemptIdString = taskAttemptId.toString();
-
-      if (closed) {
-        LOG.info("Client has already been closed, but received heartbeat from " + taskAttemptIdString);
-        // Set shouldDie response so the LLAP daemon closes this umbilical connection.
-        response.setShouldDie();
-        return response;
-      }
-
       updateHeartbeatInfo(taskAttemptIdString);
 
       List<TezEvent> tezEvents = null;
-      PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
-      if (pendingEventData == null) {
-        tezEvents = Collections.emptyList();
+      LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString);
+      if (client == null) {
+        // Heartbeat is from a task that we are not currently tracking.
+        LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
+        response.setShouldDie(); // Do any of the other fields need to be set?
+        return response;
+      }
 
-        // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
-        if (!registeredTasks.containsKey(taskAttemptIdString)) {
-          LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
-          response.setShouldDie(); // Do any of the other fields need to be set?
-          return response;
-        }
+      if (client.requestInfo.state == RequestState.PENDING) {
+        client.requestInfo.state = RequestState.RUNNING;
+        tezEvents = client.tezEvents;
       } else {
-        tezEvents = pendingEventData.tezEvents;
-        // Tasks removed from the pending list should then be added to the registered list.
-        registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
+        tezEvents = Collections.emptyList();
       }
 
+      boolean shouldUnregisterClient = false;
+
       response.setLastRequestId(request.getRequestId());
       // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
       // Also since we have all the MRInput events here - they'll all be sent in together.
@@ -443,11 +431,11 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
         switch (eventType) {
           case TASK_ATTEMPT_COMPLETED_EVENT:
             LOG.debug("Task completed event for " + taskAttemptIdString);
-            registeredTasks.remove(taskAttemptIdString);
+            shouldUnregisterClient = true;
             break;
           case TASK_ATTEMPT_FAILED_EVENT:
             LOG.debug("Task failed event for " + taskAttemptIdString);
-            registeredTasks.remove(taskAttemptIdString);
+            shouldUnregisterClient = true;
             break;
           case TASK_STATUS_UPDATE_EVENT:
             // If we want to handle counters
@@ -459,10 +447,14 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
         }
       }
 
+      if (shouldUnregisterClient) {
+        client.unregisterClient();
+      }
+
       // Pass the request on to the responder
       try {
-        if (responder != null) {
-          responder.heartbeat(request);
+        if (client.responder != null) {
+          client.responder.heartbeat(request);
         }
       } catch (Exception err) {
         LOG.error("Error during responder execution", err);
@@ -474,6 +466,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     @Override
     public void nodeHeartbeat(
         Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Node heartbeat from " + hostname + ":" + port + ", " + uniqueId);
+      }
       updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw);
       // No need to propagate to this to the responder
     }
@@ -482,14 +477,18 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
       String taskAttemptIdString = taskAttemptId.toString();
       LOG.error("Task killed - " + taskAttemptIdString);
-      registeredTasks.remove(taskAttemptIdString);
-
-      try {
-        if (responder != null) {
-          responder.taskKilled(taskAttemptId);
+      LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString);
+      if (client != null) {
+        try {
+          client.unregisterClient();
+          if (client.responder != null) {
+            client.responder.taskKilled(taskAttemptId);
+          }
+        } catch (Exception err) {
+          LOG.error("Error during responder execution", err);
         }
-      } catch (Exception err) {
-        LOG.error("Error during responder execution", err);
+      } else {
+        LOG.info("Received task killed notification for task which is not currently being tracked: " + taskAttemptId);
       }
     }
 
@@ -504,38 +503,60 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
       return ProtocolSignature.getProtocolSignature(this, protocol,
           clientVersion, clientMethodsHash);
     }
-  }
 
-  private static void scheduleClientForCleanup(LlapTaskUmbilicalExternalClient client) {
-    // Add a bit of delay in case the daemon has not closed the umbilical connection yet.
-    clientCleanupExecuter.schedule(new ClientCleanupTask(client), cleanupDelay, TimeUnit.MILLISECONDS);
-  }
+    private void unregisterClient(String taskAttemptId) {
+      registeredClients.remove(taskAttemptId);
+    }
 
-  static final ScheduledThreadPoolExecutor clientCleanupExecuter = new ScheduledThreadPoolExecutor(1);
-  static final int cleanupDelay = 2000;
+    private void updateHeartbeatInfo(String taskAttemptId) {
+      int updateCount = 0;
 
-  static class ClientCleanupTask implements Runnable {
-    final LlapTaskUmbilicalExternalClient client;
+      LlapTaskUmbilicalExternalClient registeredClient = registeredClients.get(taskAttemptId);
+      if (registeredClient != null) {
+        registeredClient.setLastHeartbeat(System.currentTimeMillis());
+        updateCount++;
+      }
 
-    public ClientCleanupTask(LlapTaskUmbilicalExternalClient client) {
-      this.client = client;
+      if (updateCount == 0) {
+        LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+      }
     }
 
-    @Override
-    public void run() {
-      if (client.llapTaskUmbilicalServer.getNumOpenConnections() == 0) {
-        // No more outstanding connections, ok to close.
-        try {
-          LOG.debug("Closing client");
-          client.doShutdown();
-        } catch (Exception err) {
-          LOG.error("Error cleaning up client", err);
+    private void updateHeartbeatInfo(
+        String hostname, String uniqueId, int port, TezAttemptArray tasks) {
+      int updateCount = 0;
+      HashSet<TezTaskAttemptID> attempts = new HashSet<>();
+      for (Writable w : tasks.get()) {
+        attempts.add((TezTaskAttemptID)w);
+      }
+
+      String error = "";
+      for (Map.Entry<String, LlapTaskUmbilicalExternalClient> entry : registeredClients.entrySet()) {
+        LlapTaskUmbilicalExternalClient registeredClient = entry.getValue();
+        if (doesClientMatchHeartbeat(registeredClient, hostname, uniqueId, port)) {
+          TezTaskAttemptID ta = TezTaskAttemptID.fromString(registeredClient.requestInfo.taskAttemptId);
+          if (attempts.contains(ta)) {
+            registeredClient.setLastHeartbeat(System.currentTimeMillis());
+            updateCount++;
+          } else {
+            error += (registeredClient.requestInfo.taskAttemptId + ", ");
+          }
         }
-      } else {
-        // Reschedule this task for later.
-        LOG.debug("Client still has umbilical connection - rescheduling cleanup.");
-        scheduleClientForCleanup(client);
       }
+      if (!error.isEmpty()) {
+        LOG.info("The tasks we expected to be on the node are not there: " + error);
+      }
+
+      if (updateCount == 0) {
+        LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
+      }
+    }
+
+    private static boolean doesClientMatchHeartbeat(LlapTaskUmbilicalExternalClient client,
+        String hostname, String uniqueId, int port) {
+      return (hostname.equals(client.requestInfo.hostname)
+          && port == client.requestInfo.port
+          && uniqueId.equals(client.requestInfo.uniqueNodeId));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
index 403381d..89cb6fb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -18,10 +18,15 @@ package org.apache.hadoop.hive.llap.tezplugins.helpers;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
@@ -32,6 +37,7 @@ import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,12 +48,11 @@ public class LlapTaskUmbilicalServer {
   protected volatile Server server;
   private final InetSocketAddress address;
   private final AtomicBoolean started = new AtomicBoolean(true);
+  private JobTokenSecretManager jobTokenSecretManager;
+  private Map<String, int[]> tokenRefMap = new HashMap<String, int[]>();
 
-  public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws
-      IOException {
-    JobTokenSecretManager jobTokenSecretManager =
-        new JobTokenSecretManager();
-    jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
+  public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers) throws IOException {
+    jobTokenSecretManager = new JobTokenSecretManager();
 
     server = new RPC.Builder(conf)
         .setProtocol(LlapTaskUmbilicalProtocol.class)
@@ -65,7 +70,7 @@ public class LlapTaskUmbilicalServer {
     this.address = NetUtils.getConnectAddress(server);
     LOG.info(
         "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address +
-            " with numHandlers=" + numHandlers);
+        " with numHandlers=" + numHandlers);
   }
 
   public InetSocketAddress getAddress() {
@@ -76,6 +81,33 @@ public class LlapTaskUmbilicalServer {
     return server.getNumOpenConnections();
   }
 
+  public synchronized void addTokenForJob(String tokenIdentifier, Token<JobTokenIdentifier> token) {
+    // Maintain count of outstanding requests for tokenIdentifier.
+    int[] refCount = tokenRefMap.get(tokenIdentifier);
+    if (refCount == null) {
+      refCount = new int[] { 0 };
+      tokenRefMap.put(tokenIdentifier, refCount);
+      // Should only need to insert the token the first time.
+      jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
+    }
+    refCount[0]++;
+  }
+
+  public synchronized void removeTokenForJob(String tokenIdentifier) {
+    // Maintain count of outstanding requests for tokenIdentifier.
+    // If count goes to 0, it is safe to remove the token.
+    int[] refCount = tokenRefMap.get(tokenIdentifier);
+    if (refCount == null) {
+      LOG.warn("No refCount found for tokenIdentifier " + tokenIdentifier);
+    } else {
+      refCount[0]--;
+      if (refCount[0] <= 0) {
+        tokenRefMap.remove(tokenIdentifier);
+        jobTokenSecretManager.removeTokenForJob(tokenIdentifier);
+      }
+    }
+  }
+
   public void shutdownServer() {
     if (started.get()) { // Primarily to avoid multiple shutdowns.
       started.set(false);

http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index eb93241..201f5fa 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -147,8 +147,6 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
     LlapTaskUmbilicalExternalClient llapClient =
       new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
           submitWorkInfo.getToken(), umbilicalResponder, llapToken);
-    llapClient.init(job);
-    llapClient.start();
 
     int attemptNum = 0;
     // Use task attempt number from conf if provided

http://git-wip-us.apache.org/repos/asf/hive/blob/c713eeec/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index d4ec44e..5003f42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.FieldDesc;
@@ -90,6 +91,8 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -368,6 +371,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         queryUser = UserGroupInformation.getCurrentUser().getUserName();
       }
 
+      // Generate umbilical token (applies to all splits)
+      Token<JobTokenIdentifier> umbilicalToken = JobTokenCreator.createJobToken(applicationId);
+
       LOG.info("Number of splits: " + (eventList.size() - 1));
       SignedMessage signedSvs = null;
       for (int i = 0; i < eventList.size() - 1; i++) {
@@ -388,7 +394,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
         SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
             System.currentTimeMillis(), taskSpec.getVertexParallelism(), signedSvs.message,
-            signedSvs.signature);
+            signedSvs.signature, umbilicalToken);
         byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
 
         // 3. Generate input event.
@@ -406,6 +412,18 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
   }
 
+  private static class JobTokenCreator {
+    private static Token<JobTokenIdentifier> createJobToken(ApplicationId applicationId) {
+      String tokenIdentifier = applicationId.toString();
+      JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+          tokenIdentifier));
+      Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+          new JobTokenSecretManager());
+      sessionToken.setService(identifier.getJobId());
+      return sessionToken;
+    }
+  }
+
   private SplitLocationInfo[] makeLocationHints(TaskLocationHint hint) {
     Set<String> hosts = hint.getHosts();
     if (hosts.size() != 1) {