You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/03/28 20:16:38 UTC
hive git commit: HIVE-13368: LlapTaskUmbilicalExternalClient should
handle submission rejection/failures/timeouts from LLAP daemon
Repository: hive
Updated Branches:
refs/heads/llap 25140659c -> 134f2f749
HIVE-13368: LlapTaskUmbilicalExternalClient should handle submission rejection/failures/timeouts from LLAP daemon
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/134f2f74
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/134f2f74
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/134f2f74
Branch: refs/heads/llap
Commit: 134f2f749e1981738a660843e911db173c86bcfa
Parents: 2514065
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Mar 28 11:16:33 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Mar 28 11:16:33 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/LlapInputFormat.java | 137 ++++++++++-
.../ext/LlapTaskUmbilicalExternalClient.java | 237 ++++++++++++++++++-
.../hadoop/hive/llap/LlapRecordReader.java | 123 +++++++++-
3 files changed, 480 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index 847c74f..7f11e1d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -23,13 +23,18 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+
+import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapRecordReader.ReaderEvent;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
@@ -52,11 +57,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+
public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
@@ -84,9 +97,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
+ " and outputformat port " + serviceInstance.getOutputFormatPort());
+ LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
+ new LlapRecordReaderTaskUmbilicalExternalResponder();
LlapTaskUmbilicalExternalClient llapClient =
new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
- submitWorkInfo.getToken());
+ submitWorkInfo.getToken(), umbilicalResponder);
llapClient.init(job);
llapClient.start();
@@ -117,7 +132,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
LOG.info("Registered id: " + id);
- return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+ LlapRecordReader recordReader = new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+ umbilicalResponder.setRecordReader(recordReader);
+ return recordReader;
}
@Override
@@ -254,4 +271,120 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
containerCredentials.writeTokenStorageToStream(containerTokens_dob);
return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
}
+
+ private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
+ protected LlapRecordReader recordReader = null;
+ protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+ public LlapRecordReaderTaskUmbilicalExternalResponder() {
+ }
+
+ @Override
+ public void submissionFailed(String fragmentId, Throwable throwable) {
+ try {
+ sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+ "Received submission failed event for fragment ID " + fragmentId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ @Override
+ public void heartbeat(TezHeartbeatRequest request) {
+ TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+ List<TezEvent> inEvents = request.getEvents();
+ for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ EventType eventType = tezEvent.getEventType();
+ try {
+ switch (eventType) {
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ sendOrQueueEvent(LlapRecordReader.ReaderEvent.doneEvent());
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
+ sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
+ break;
+ case TASK_STATUS_UPDATE_EVENT:
+ // If we want to handle counters
+ break;
+ default:
+ LOG.warn("Unhandled event type " + eventType);
+ break;
+ }
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+ }
+
+ @Override
+ public void taskKilled(TezTaskAttemptID taskAttemptId) {
+ try {
+ sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+ "Received task killed event for task ID " + taskAttemptId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ @Override
+ public void heartbeatTimeout(String taskAttemptId) {
+ try {
+ sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+ "Timed out waiting for heartbeat for task ID " + taskAttemptId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ public synchronized LlapRecordReader getRecordReader() {
+ return recordReader;
+ }
+
+ public synchronized void setRecordReader(LlapRecordReader recordReader) {
+ this.recordReader = recordReader;
+
+ if (recordReader == null) {
+ return;
+ }
+
+ // If any events were queued by the responder, give them to the record reader now.
+ while (!queuedEvents.isEmpty()) {
+ LlapRecordReader.ReaderEvent readerEvent = queuedEvents.poll();
+ LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
+ recordReader.handleEvent(readerEvent);
+ }
+ }
+
+ /**
+ * Send the ReaderEvents to the record reader, if it is registered to this responder.
+ * If there is no registered record reader, add them to a list of pending reader events
+ * since we don't want to drop these events.
+ * @param readerEvent
+ */
+ protected synchronized void sendOrQueueEvent(LlapRecordReader.ReaderEvent readerEvent) {
+ LlapRecordReader recordReader = getRecordReader();
+ if (recordReader != null) {
+ recordReader.handleEvent(readerEvent);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
+ + " with message " + readerEvent.getMessage());
+ }
+
+ try {
+ queuedEvents.put(readerEvent);
+ } catch (Exception err) {
+ throw new RuntimeException("Unexpected exception while queueing reader event", err);
+ }
+ }
+ }
+
+ /**
+ * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
+ */
+ public void clearQueuedEvents() {
+ queuedEvents.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 16cfd01..7d06637 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -2,12 +2,17 @@ package org.apache.hadoop.hive.llap.ext;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
+import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
@@ -19,16 +24,20 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class LlapTaskUmbilicalExternalClient extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
@@ -41,20 +50,51 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
protected final String tokenIdentifier;
protected final Token<JobTokenIdentifier> sessionToken;
+ private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
+ private LlapTaskUmbilicalExternalResponder responder = null;
+ private final ScheduledThreadPoolExecutor timer;
+ private final long connectionTimeout;
+
+ private static class TaskHeartbeatInfo {
+ final String taskAttemptId;
+ final String hostname;
+ final int port;
+ final AtomicLong lastHeartbeat = new AtomicLong();
+
+ public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
+ this.taskAttemptId = taskAttemptId;
+ this.hostname = hostname;
+ this.port = port;
+ this.lastHeartbeat.set(System.currentTimeMillis());
+ }
+ }
- private final ConcurrentMap<String, List<TezEvent>> pendingEvents = new ConcurrentHashMap<>();
+ private static class PendingEventData {
+ final TaskHeartbeatInfo heartbeatInfo;
+ final List<TezEvent> tezEvents;
+ public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
+ this.heartbeatInfo = heartbeatInfo;
+ this.tezEvents = tezEvents;
+ }
+ }
// TODO KKK Work out the details of the tokenIdentifier, and the session token.
// It may just be possible to create one here - since Shuffle is not involved, and this is only used
// for communication from LLAP-Daemons to the server. It will need to be sent in as part
// of the job submission request.
- public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken) {
+ public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
+ Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
super(LlapTaskUmbilicalExternalClient.class.getName());
this.conf = conf;
this.umbilical = new LlapTaskUmbilicalExternalImpl();
this.tokenIdentifier = tokenIdentifier;
this.sessionToken = sessionToken;
+ this.responder = responder;
+ this.timer = new ScheduledThreadPoolExecutor(1);
+ this.connectionTimeout = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
this.communicator = new LlapProtocolClientProxy(1, conf, null);
this.communicator.init(conf);
@@ -71,6 +111,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
@Override
public void serviceStop() {
llapTaskUmbilicalServer.shutdownServer();
+ timer.shutdown();
if (this.communicator != null) {
this.communicator.stop();
}
@@ -89,7 +130,15 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
// Register the pending events to be sent for this spec.
- pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents);
+ String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
+ PendingEventData pendingEventData = new PendingEventData(
+ new TaskHeartbeatInfo(fragmentId, llapHost, llapPort),
+ tezEvents);
+ pendingEvents.putIfAbsent(fragmentId, pendingEventData);
+
+ // Setup timer task to check for hearbeat timeouts
+ timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
+ connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
// Send out the actual SubmitWorkRequest
communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
@@ -99,7 +148,12 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
if (response.hasSubmissionState()) {
if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
- LOG.info("Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.");
+ String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.";
+ LOG.info(msg);
+ if (responder != null) {
+ Throwable err = new RuntimeException(msg);
+ responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
+ }
return;
}
}
@@ -107,7 +161,10 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
@Override
public void indicateError(Throwable t) {
- LOG.error("Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), t);
+ String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
+ LOG.error(msg, t);
+ Throwable err = new RuntimeException(msg, t);
+ responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
}
});
@@ -130,9 +187,101 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
}
+ private void updateHeartbeatInfo(String taskAttemptId) {
+ int updateCount = 0;
+
+ PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
+ if (pendingEventData != null) {
+ pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+
+ TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
+ if (heartbeatInfo != null) {
+ heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+
+ if (updateCount == 0) {
+ LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+ }
+ }
+
+ private void updateHeartbeatInfo(String hostname, int port) {
+ int updateCount = 0;
+
+ for (String key : pendingEvents.keySet()) {
+ PendingEventData pendingEventData = pendingEvents.get(key);
+ if (pendingEventData != null) {
+ if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
+ && pendingEventData.heartbeatInfo.port == port) {
+ pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+ }
+ }
+
+ for (String key : registeredTasks.keySet()) {
+ TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+ if (heartbeatInfo != null) {
+ if (heartbeatInfo.hostname.equals(hostname)
+ && heartbeatInfo.port == port) {
+ heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+ }
+ }
+
+ if (updateCount == 0) {
+ LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
+ }
+ }
+ private class HeartbeatCheckTask implements Runnable {
+ public void run() {
+ long currentTime = System.currentTimeMillis();
+ List<String> timedOutTasks = new ArrayList<String>();
+
+ // Check both pending and registered tasks for timeouts
+ for (String key : pendingEvents.keySet()) {
+ PendingEventData pendingEventData = pendingEvents.get(key);
+ if (pendingEventData != null) {
+ if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+ timedOutTasks.add(key);
+ }
+ }
+ }
+ for (String timedOutTask : timedOutTasks) {
+ LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
+ responder.heartbeatTimeout(timedOutTask);
+ pendingEvents.remove(timedOutTask);
+ // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+ }
+ timedOutTasks.clear();
+ for (String key : registeredTasks.keySet()) {
+ TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+ if (heartbeatInfo != null) {
+ if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+ timedOutTasks.add(key);
+ }
+ }
+ }
+ for (String timedOutTask : timedOutTasks) {
+ LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
+ responder.heartbeatTimeout(timedOutTask);
+ registeredTasks.remove(timedOutTask);
+ // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+ }
+ }
+ }
+ public interface LlapTaskUmbilicalExternalResponder {
+ void submissionFailed(String fragmentId, Throwable throwable);
+ void heartbeat(TezHeartbeatRequest request);
+ void taskKilled(TezTaskAttemptID taskAttemptId);
+ void heartbeatTimeout(String fragmentId);
+ }
@@ -153,16 +302,35 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
// This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
- // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received heartbeat from container, request=" + request);
+ }
+ // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
TezHeartbeatResponse response = new TezHeartbeatResponse();
+
+ response.setLastRequestId(request.getRequestId());
// Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+ String taskAttemptIdString = taskAttemptId.toString();
+
+ updateHeartbeatInfo(taskAttemptIdString);
- List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
- if (tezEvents == null) {
+ List<TezEvent> tezEvents = null;
+ PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
+ if (pendingEventData == null) {
tezEvents = Collections.emptyList();
+
+ // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
+ if (!registeredTasks.containsKey(taskAttemptIdString)) {
+ LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
+ response.setShouldDie(); // Do any of the other fields need to be set?
+ return response;
+ }
+ } else {
+ tezEvents = pendingEventData.tezEvents;
+ // Tasks removed from the pending list should then be added to the registered list.
+ registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
}
response.setLastRequestId(request.getRequestId());
@@ -172,20 +340,63 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
response.setEvents(tezEvents);
- // TODO KKK: Should ideally handle things like Task success notifications.
- // Can this somehow be hooked into the LlapTaskCommunicator to make testing easy
+ List<TezEvent> inEvents = request.getEvents();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Heartbeat from " + taskAttemptIdString +
+ " events: " + (inEvents != null ? inEvents.size() : -1));
+ }
+ for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ EventType eventType = tezEvent.getEventType();
+ switch (eventType) {
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ LOG.debug("Task completed event for " + taskAttemptIdString);
+ registeredTasks.remove(taskAttemptIdString);
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ LOG.debug("Task failed event for " + taskAttemptIdString);
+ registeredTasks.remove(taskAttemptIdString);
+ break;
+ case TASK_STATUS_UPDATE_EVENT:
+ // If we want to handle counters
+ LOG.debug("Task update event for " + taskAttemptIdString);
+ break;
+ default:
+ LOG.warn("Unhandled event type " + eventType);
+ break;
+ }
+ }
+
+ // Pass the request on to the responder
+ try {
+ if (responder != null) {
+ responder.heartbeat(request);
+ }
+ } catch (Exception err) {
+ LOG.error("Error during responder execution", err);
+ }
return response;
}
@Override
public void nodeHeartbeat(Text hostname, int port) throws IOException {
- // TODO Eventually implement - to handle keep-alive messages from pending work.
+ updateHeartbeatInfo(hostname.toString(), port);
+ // No need to propagate to this to the responder
}
@Override
public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
- // TODO Eventually implement - to handle preemptions within LLAP daemons.
+ String taskAttemptIdString = taskAttemptId.toString();
+ LOG.error("Task killed - " + taskAttemptIdString);
+ registeredTasks.remove(taskAttemptIdString);
+
+ try {
+ if (responder != null) {
+ responder.taskKilled(taskAttemptId);
+ }
+ } catch (Exception err) {
+ LOG.error("Error during responder execution", err);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
index ce3d39a..30ed9cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.hive.llap;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.DataInputStream;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.RCFile.Reader;
@@ -33,16 +35,25 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
DataInputStream din;
Schema schema;
Class<V> clazz;
+
+ protected Thread readerThread = null;
+ protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
+
public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) {
din = new DataInputStream(in);
this.schema = schema;
this.clazz = clazz;
+ this.readerThread = Thread.currentThread();
}
public Schema getSchema() {
@@ -75,12 +86,120 @@ public class LlapRecordReader<V extends WritableComparable> implements RecordRea
}
@Override
- public boolean next(NullWritable key, V value) {
+ public boolean next(NullWritable key, V value) throws IOException {
try {
+ // Need a way to know what thread to interrupt, since this is a blocking thread.
+ setReaderThread(Thread.currentThread());
+
value.readFields(din);
return true;
- } catch (IOException io) {
+ } catch (EOFException eof) {
+ // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case DONE:
+ break;
+ default:
+ throw new IOException("Expected reader event with done status, but got "
+ + event.getEventType() + " with message " + event.getMessage());
+ }
return false;
+ } catch (IOException io) {
+ if (Thread.interrupted()) {
+ // Either we were interrupted by one of:
+ // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
+ // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+ // Either way we should not try to block trying to read the reader events queue.
+ if (readerEvents.isEmpty()) {
+ // Case 2.
+ throw io;
+ } else {
+ // Case 1. Fail the reader, sending back the error we received from the reader event.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case ERROR:
+ throw new IOException("Received reader event error: " + event.getMessage());
+ default:
+ throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
+ }
+ }
+ } else {
+ // If we weren't interrupted, just propagate the error
+ throw io;
+ }
+ }
+ }
+
+ /**
+ * Define success/error events which are passed to the reader from a different thread.
+ * The reader will check for these events on end of input and interruption of the reader thread.
+ */
+ public static class ReaderEvent {
+ public enum EventType {
+ DONE,
+ ERROR
+ }
+
+ protected final EventType eventType;
+ protected final String message;
+
+ protected ReaderEvent(EventType type, String message) {
+ this.eventType = type;
+ this.message = message;
+ }
+
+ public static ReaderEvent doneEvent() {
+ return new ReaderEvent(EventType.DONE, "");
+ }
+
+ public static ReaderEvent errorEvent(String message) {
+ return new ReaderEvent(EventType.ERROR, message);
}
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+ }
+
+ public void handleEvent(ReaderEvent event) {
+ switch (event.getEventType()) {
+ case DONE:
+ // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
+ readerEvents.add(event);
+ break;
+ case ERROR:
+ readerEvents.add(event);
+ if (readerThread == null) {
+ throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
+ }
+ // Reader is using a blocking socket .. interrupt it.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
+ }
+ getReaderThread().interrupt();
+ default:
+ throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
+ }
+ }
+
+ protected ReaderEvent getReaderEvent() {
+ try {
+ ReaderEvent event = readerEvents.take();
+ return event;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
+ }
+ }
+
+ protected synchronized void setReaderThread(Thread readerThread) {
+ this.readerThread = readerThread;
+ }
+
+ protected synchronized Thread getReaderThread() {
+ return readerThread;
}
}