You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:25:02 UTC

[12/39] hive git commit: HIVE-13368: LlapTaskUmbilicalExternalClient should handle submission rejection/failures/timeouts from LLAP daemon

HIVE-13368: LlapTaskUmbilicalExternalClient should handle submission rejection/failures/timeouts from LLAP daemon


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/134f2f74
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/134f2f74
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/134f2f74

Branch: refs/heads/master
Commit: 134f2f749e1981738a660843e911db173c86bcfa
Parents: 2514065
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Mar 28 11:16:33 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Mar 28 11:16:33 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/LlapInputFormat.java       | 137 ++++++++++-
 .../ext/LlapTaskUmbilicalExternalClient.java    | 237 ++++++++++++++++++-
 .../hadoop/hive/llap/LlapRecordReader.java      | 123 +++++++++-
 3 files changed, 480 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index 847c74f..7f11e1d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -23,13 +23,18 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+
+import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapRecordReader.ReaderEvent;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
@@ -52,11 +57,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+
 public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
@@ -84,9 +97,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
         + " and outputformat port " + serviceInstance.getOutputFormatPort());
 
+    LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
+        new LlapRecordReaderTaskUmbilicalExternalResponder();
     LlapTaskUmbilicalExternalClient llapClient =
       new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
-          submitWorkInfo.getToken());
+          submitWorkInfo.getToken(), umbilicalResponder);
     llapClient.init(job);
     llapClient.start();
 
@@ -117,7 +132,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
 
     LOG.info("Registered id: " + id);
 
-    return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+    LlapRecordReader recordReader = new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+    umbilicalResponder.setRecordReader(recordReader);
+    return recordReader;
   }
 
   @Override
@@ -254,4 +271,120 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     containerCredentials.writeTokenStorageToStream(containerTokens_dob);
     return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
   }
+
+  private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
+    protected LlapRecordReader recordReader = null;
+    protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+    public LlapRecordReaderTaskUmbilicalExternalResponder() {
+    }
+
+    @Override
+    public void submissionFailed(String fragmentId, Throwable throwable) {
+      try {
+        sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+            "Received submission failed event for fragment ID " + fragmentId));
+      } catch (Exception err) {
+        LOG.error("Error during heartbeat responder:", err);
+      }
+    }
+
+    @Override
+    public void heartbeat(TezHeartbeatRequest request) {
+      TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+      List<TezEvent> inEvents = request.getEvents();
+      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+        EventType eventType = tezEvent.getEventType();
+        try {
+          switch (eventType) {
+            case TASK_ATTEMPT_COMPLETED_EVENT:
+              sendOrQueueEvent(LlapRecordReader.ReaderEvent.doneEvent());
+              break;
+            case TASK_ATTEMPT_FAILED_EVENT:
+              TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
+              sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
+              break;
+            case TASK_STATUS_UPDATE_EVENT:
+              // If we want to handle counters
+              break;
+            default:
+              LOG.warn("Unhandled event type " + eventType);
+              break;
+          }
+        } catch (Exception err) {
+          LOG.error("Error during heartbeat responder:", err);
+        }
+      }
+    }
+
+    @Override
+    public void taskKilled(TezTaskAttemptID taskAttemptId) {
+      try {
+        sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+            "Received task killed event for task ID " + taskAttemptId));
+      } catch (Exception err) {
+        LOG.error("Error during heartbeat responder:", err);
+      }
+    }
+
+    @Override
+    public void heartbeatTimeout(String taskAttemptId) {
+      try {
+        sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+            "Timed out waiting for heartbeat for task ID " + taskAttemptId));
+      } catch (Exception err) {
+        LOG.error("Error during heartbeat responder:", err);
+      }
+    }
+
+    public synchronized LlapRecordReader getRecordReader() {
+      return recordReader;
+    }
+
+    public synchronized void setRecordReader(LlapRecordReader recordReader) {
+      this.recordReader = recordReader;
+
+      if (recordReader == null) {
+        return;
+      }
+
+      // If any events were queued by the responder, give them to the record reader now.
+      while (!queuedEvents.isEmpty()) {
+        LlapRecordReader.ReaderEvent readerEvent = queuedEvents.poll();
+        LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
+        recordReader.handleEvent(readerEvent);
+      }
+    }
+
+    /**
+     * Send the ReaderEvents to the record reader, if it is registered to this responder.
+     * If there is no registered record reader, add them to a list of pending reader events
+     * since we don't want to drop these events.
+     * @param readerEvent
+     */
+    protected synchronized void sendOrQueueEvent(LlapRecordReader.ReaderEvent readerEvent) {
+      LlapRecordReader recordReader = getRecordReader();
+      if (recordReader != null) {
+        recordReader.handleEvent(readerEvent);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
+              + " with message " + readerEvent.getMessage());
+        }
+
+        try {
+          queuedEvents.put(readerEvent);
+        } catch (Exception err) {
+          throw new RuntimeException("Unexpected exception while queueing reader event", err);
+        }
+      }
+    }
+
+    /**
+     * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
+     */
+    public void clearQueuedEvents() {
+      queuedEvents.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 16cfd01..7d06637 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -2,12 +2,17 @@ package org.apache.hadoop.hive.llap.ext;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
@@ -19,16 +24,20 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class LlapTaskUmbilicalExternalClient extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
@@ -41,20 +50,51 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
   protected final String tokenIdentifier;
   protected final Token<JobTokenIdentifier> sessionToken;
 
+  private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
+  private LlapTaskUmbilicalExternalResponder responder = null;
+  private final ScheduledThreadPoolExecutor timer;
+  private final long connectionTimeout;
+
+  private static class TaskHeartbeatInfo {
+    final String taskAttemptId;
+    final String hostname;
+    final int port;
+    final AtomicLong lastHeartbeat = new AtomicLong();
+
+    public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
+      this.taskAttemptId = taskAttemptId;
+      this.hostname = hostname;
+      this.port = port;
+      this.lastHeartbeat.set(System.currentTimeMillis());
+    }
+  }
 
-  private final ConcurrentMap<String, List<TezEvent>> pendingEvents = new ConcurrentHashMap<>();
+  private static class PendingEventData {
+    final TaskHeartbeatInfo heartbeatInfo;
+    final List<TezEvent> tezEvents;
 
+    public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
+      this.heartbeatInfo = heartbeatInfo;
+      this.tezEvents = tezEvents;
+    }
+  }
 
   // TODO KKK Work out the details of the tokenIdentifier, and the session token.
   // It may just be possible to create one here - since Shuffle is not involved, and this is only used
   // for communication from LLAP-Daemons to the server. It will need to be sent in as part
   // of the job submission request.
-  public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken) {
+  public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
+      Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
     super(LlapTaskUmbilicalExternalClient.class.getName());
     this.conf = conf;
     this.umbilical = new LlapTaskUmbilicalExternalImpl();
     this.tokenIdentifier = tokenIdentifier;
     this.sessionToken = sessionToken;
+    this.responder = responder;
+    this.timer = new ScheduledThreadPoolExecutor(1);
+    this.connectionTimeout = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
     // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
     this.communicator = new LlapProtocolClientProxy(1, conf, null);
     this.communicator.init(conf);
@@ -71,6 +111,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
   @Override
   public void serviceStop() {
     llapTaskUmbilicalServer.shutdownServer();
+    timer.shutdown();
     if (this.communicator != null) {
       this.communicator.stop();
     }
@@ -89,7 +130,15 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
     Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
 
     // Register the pending events to be sent for this spec.
-    pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents);
+    String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
+    PendingEventData pendingEventData = new PendingEventData(
+        new TaskHeartbeatInfo(fragmentId, llapHost, llapPort),
+        tezEvents);
+    pendingEvents.putIfAbsent(fragmentId, pendingEventData);
+
+    // Setup timer task to check for hearbeat timeouts
+    timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
+        connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
 
     // Send out the actual SubmitWorkRequest
     communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
@@ -99,7 +148,12 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
           public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
             if (response.hasSubmissionState()) {
               if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
-                LOG.info("Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.");
+                String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.";
+                LOG.info(msg);
+                if (responder != null) {
+                  Throwable err = new RuntimeException(msg);
+                  responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
+                }
                 return;
               }
             }
@@ -107,7 +161,10 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
 
           @Override
           public void indicateError(Throwable t) {
-            LOG.error("Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), t);
+            String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
+            LOG.error(msg, t);
+            Throwable err = new RuntimeException(msg, t);
+            responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
           }
         });
 
@@ -130,9 +187,101 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
 
   }
 
+  private void updateHeartbeatInfo(String taskAttemptId) {
+    int updateCount = 0;
+
+    PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
+    if (pendingEventData != null) {
+      pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+      updateCount++;
+    }
+
+    TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
+    if (heartbeatInfo != null) {
+      heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+      updateCount++;
+    }
+
+    if (updateCount == 0) {
+      LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+    }
+  }
+
+  private void updateHeartbeatInfo(String hostname, int port) {
+    int updateCount = 0;
+
+    for (String key : pendingEvents.keySet()) {
+      PendingEventData pendingEventData = pendingEvents.get(key);
+      if (pendingEventData != null) {
+        if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
+            && pendingEventData.heartbeatInfo.port == port) {
+          pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+          updateCount++;
+        }
+      }
+    }
+
+    for (String key : registeredTasks.keySet()) {
+      TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+      if (heartbeatInfo != null) {
+        if (heartbeatInfo.hostname.equals(hostname)
+            && heartbeatInfo.port == port) {
+          heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+          updateCount++;
+        }
+      }
+    }
+
+    if (updateCount == 0) {
+      LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
+    }
+  }
 
+  private class HeartbeatCheckTask implements Runnable {
+    public void run() {
+      long currentTime = System.currentTimeMillis();
+      List<String> timedOutTasks = new ArrayList<String>();
+
+      // Check both pending and registered tasks for timeouts
+      for (String key : pendingEvents.keySet()) {
+        PendingEventData pendingEventData = pendingEvents.get(key);
+        if (pendingEventData != null) {
+          if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+            timedOutTasks.add(key);
+          }
+        }
+      }
+      for (String timedOutTask : timedOutTasks) {
+        LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
+        responder.heartbeatTimeout(timedOutTask);
+        pendingEvents.remove(timedOutTask);
+        // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+      }
 
+      timedOutTasks.clear();
+      for (String key : registeredTasks.keySet()) {
+        TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+        if (heartbeatInfo != null) {
+          if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+            timedOutTasks.add(key);
+          }
+        }
+      }
+      for (String timedOutTask : timedOutTasks) {
+        LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
+        responder.heartbeatTimeout(timedOutTask);
+        registeredTasks.remove(timedOutTask);
+        // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+      }
+    }
+  }
 
+  public interface LlapTaskUmbilicalExternalResponder {
+    void submissionFailed(String fragmentId, Throwable throwable);
+    void heartbeat(TezHeartbeatRequest request);
+    void taskKilled(TezTaskAttemptID taskAttemptId);
+    void heartbeatTimeout(String fragmentId);
+  }
 
 
 
@@ -153,16 +302,35 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
 
       // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
 
-      // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
-
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received heartbeat from container, request=" + request);
+      }
 
+      // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
       TezHeartbeatResponse response = new TezHeartbeatResponse();
+
+      response.setLastRequestId(request.getRequestId());
       // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
       TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+      String taskAttemptIdString = taskAttemptId.toString();
+
+      updateHeartbeatInfo(taskAttemptIdString);
 
-      List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
-      if (tezEvents == null) {
+      List<TezEvent> tezEvents = null;
+      PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
+      if (pendingEventData == null) {
         tezEvents = Collections.emptyList();
+
+        // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
+        if (!registeredTasks.containsKey(taskAttemptIdString)) {
+          LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
+          response.setShouldDie(); // Do any of the other fields need to be set?
+          return response;
+        }
+      } else {
+        tezEvents = pendingEventData.tezEvents;
+        // Tasks removed from the pending list should then be added to the registered list.
+        registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
       }
 
       response.setLastRequestId(request.getRequestId());
@@ -172,20 +340,63 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
       response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
       response.setEvents(tezEvents);
 
-      // TODO KKK: Should ideally handle things like Task success notifications.
-      // Can this somehow be hooked into the LlapTaskCommunicator to make testing easy
+      List<TezEvent> inEvents = request.getEvents();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Heartbeat from " + taskAttemptIdString +
+            " events: " + (inEvents != null ? inEvents.size() : -1));
+      }
+      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+        EventType eventType = tezEvent.getEventType();
+        switch (eventType) {
+          case TASK_ATTEMPT_COMPLETED_EVENT:
+            LOG.debug("Task completed event for " + taskAttemptIdString);
+            registeredTasks.remove(taskAttemptIdString);
+            break;
+          case TASK_ATTEMPT_FAILED_EVENT:
+            LOG.debug("Task failed event for " + taskAttemptIdString);
+            registeredTasks.remove(taskAttemptIdString);
+            break;
+          case TASK_STATUS_UPDATE_EVENT:
+            // If we want to handle counters
+            LOG.debug("Task update event for " + taskAttemptIdString);
+            break;
+          default:
+            LOG.warn("Unhandled event type " + eventType);
+            break;
+        }
+      }
+
+      // Pass the request on to the responder
+      try {
+        if (responder != null) {
+          responder.heartbeat(request);
+        }
+      } catch (Exception err) {
+        LOG.error("Error during responder execution", err);
+      }
 
       return response;
     }
 
     @Override
     public void nodeHeartbeat(Text hostname, int port) throws IOException {
-      // TODO Eventually implement - to handle keep-alive messages from pending work.
+      updateHeartbeatInfo(hostname.toString(), port);
+      // No need to propagate to this to the responder
     }
 
     @Override
     public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
-      // TODO Eventually implement - to handle preemptions within LLAP daemons.
+      String taskAttemptIdString = taskAttemptId.toString();
+      LOG.error("Task killed - " + taskAttemptIdString);
+      registeredTasks.remove(taskAttemptIdString);
+
+      try {
+        if (responder != null) {
+          responder.taskKilled(taskAttemptId);
+        }
+      } catch (Exception err) {
+        LOG.error("Error during responder execution", err);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
index ce3d39a..30ed9cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.hive.llap;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.DataInputStream;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.RCFile.Reader;
@@ -33,16 +35,25 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.hive.metastore.api.Schema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
 
   DataInputStream din;
   Schema schema;
   Class<V> clazz;
 
+
+  protected Thread readerThread = null;
+  protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
+
   public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) {
     din = new DataInputStream(in);
     this.schema = schema;
     this.clazz = clazz;
+    this.readerThread = Thread.currentThread();
   }
 
   public Schema getSchema() {
@@ -75,12 +86,120 @@ public class LlapRecordReader<V extends WritableComparable> implements RecordRea
   }
 
   @Override
-  public boolean next(NullWritable key, V value) {
+  public boolean next(NullWritable key, V value) throws IOException {
     try {
+      // Need a way to know what thread to interrupt, since this is a blocking thread.
+      setReaderThread(Thread.currentThread());
+
       value.readFields(din);
       return true;
-    } catch (IOException io) {
+    } catch (EOFException eof) {
+      // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
+      ReaderEvent event = getReaderEvent();
+      switch (event.getEventType()) {
+        case DONE:
+          break;
+        default:
+          throw new IOException("Expected reader event with done status, but got "
+              + event.getEventType() + " with message " + event.getMessage());
+      }
       return false;
+    } catch (IOException io) {
+      if (Thread.interrupted()) {
+        // Either we were interrupted by one of:
+        // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
+        // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+        // Either way we should not try to block trying to read the reader events queue.
+        if (readerEvents.isEmpty()) {
+          // Case 2.
+          throw io;
+        } else {
+          // Case 1. Fail the reader, sending back the error we received from the reader event.
+          ReaderEvent event = getReaderEvent();
+          switch (event.getEventType()) {
+            case ERROR:
+              throw new IOException("Received reader event error: " + event.getMessage());
+            default:
+              throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
+          }
+        }
+      } else {
+        // If we weren't interrupted, just propagate the error
+        throw io;
+      }
+    }
+  }
+
+  /**
+   * Define success/error events which are passed to the reader from a different thread.
+   * The reader will check for these events on end of input and interruption of the reader thread.
+   */
+  public static class ReaderEvent {
+    public enum EventType {
+      DONE,
+      ERROR
+    }
+
+    protected final EventType eventType;
+    protected final String message;
+
+    protected ReaderEvent(EventType type, String message) {
+      this.eventType = type;
+      this.message = message;
+    }
+
+    public static ReaderEvent doneEvent() {
+      return new ReaderEvent(EventType.DONE, "");
+    }
+
+    public static ReaderEvent errorEvent(String message) {
+      return new ReaderEvent(EventType.ERROR, message);
     }
+
+    public EventType getEventType() {
+      return eventType;
+    }
+
+    public String getMessage() {
+      return message;
+    }
+  }
+
+  public void handleEvent(ReaderEvent event) {
+    switch (event.getEventType()) {
+      case DONE:
+        // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
+        readerEvents.add(event);
+        break;
+      case ERROR:
+        readerEvents.add(event);
+        if (readerThread == null) {
+          throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
+        }
+        // Reader is using a blocking socket .. interrupt it.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
+        }
+        getReaderThread().interrupt();
+      default:
+        throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
+    }
+  }
+
+  protected ReaderEvent getReaderEvent() {
+    try {
+      ReaderEvent event = readerEvents.take();
+      return event;
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
+    }
+  }
+
+  protected synchronized void setReaderThread(Thread readerThread) {
+    this.readerThread = readerThread;
+  }
+
+  protected synchronized Thread getReaderThread() {
+    return readerThread;
   }
 }