You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/17 02:52:43 UTC
[05/50] [abbrv] hive git commit: HIVE-16652: LlapInputFormat: Seeing
"output error" WARN message (Jason Dere, reviewed by Siddharth Seth)
HIVE-16652: LlapInputFormat: Seeing "output error" WARN message (Jason Dere, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4291c467
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4291c467
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4291c467
Branch: refs/heads/hive-14535
Commit: 4291c467aac81bcef140f1b8b8cdaba6edaf2f96
Parents: fea6df6
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri May 12 16:52:31 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri May 12 16:52:31 2017 -0700
----------------------------------------------------------------------
.../ext/LlapTaskUmbilicalExternalClient.java | 89 +++++++++++++++++++-
.../helpers/LlapTaskUmbilicalServer.java | 4 +
2 files changed, 90 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4291c467/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 7d0d6d2..c7de417 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -47,6 +48,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableV
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
@@ -85,15 +88,18 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
private LlapTaskUmbilicalExternalResponder responder = null;
private final ScheduledThreadPoolExecutor timer;
private final long connectionTimeout;
+ private volatile boolean closed = false;
private static class TaskHeartbeatInfo {
+ final QueryIdentifierProto queryIdentifierProto;
final String taskAttemptId;
final String hostname;
String uniqueNodeId;
final int port;
final AtomicLong lastHeartbeat = new AtomicLong();
- public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
+ public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskAttemptId, String hostname, int port) {
+ this.queryIdentifierProto = queryIdentifierProto;
this.taskAttemptId = taskAttemptId;
this.hostname = hostname;
this.port = port;
@@ -137,7 +143,45 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
@Override
- public void serviceStop() {
+ public void serviceStop() throws Exception {
+ if (closed) {
+ throw new IllegalStateException("Client has already been closed");
+ }
+ closed = true;
+
+ // Check if the request is registered - if so we can cancel the request
+ for (Map.Entry<String, TaskHeartbeatInfo> taskEntry : registeredTasks.entrySet()) {
+ terminateRequest(taskEntry.getValue());
+ }
+ registeredTasks.clear();
+
+ scheduleClientForCleanup(this);
+ }
+
+ private void terminateRequest(TaskHeartbeatInfo thi) {
+ TerminateFragmentRequestProto.Builder builder = TerminateFragmentRequestProto.newBuilder();
+ builder.setQueryIdentifier(thi.queryIdentifierProto);
+ builder.setFragmentIdentifierString(thi.taskAttemptId);
+
+ final String taskAttemptId = thi.taskAttemptId;
+ communicator.sendTerminateFragment(builder.build(), thi.hostname, thi.port,
+ new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
+
+ @Override
+ public void setResponse(TerminateFragmentResponseProto response) {
+ LOG.debug("Received terminate response for " + taskAttemptId);
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ String msg = "Failed to terminate " + taskAttemptId;
+ LOG.error(msg, t);
+ // Don't propagate the error - termination was done as part of closing the client.
+ }
+ });
+ }
+
+ private void doShutdown() throws IOException {
llapTaskUmbilicalServer.shutdownServer();
timer.shutdown();
if (this.communicator != null) {
@@ -170,7 +214,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
final String fragmentId = attemptId.toString();
- final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(fragmentId, llapHost, llapPort);
+ final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort);
pendingEvents.putIfAbsent(
fragmentId, new PendingEventData(thi, Lists.<TezEvent>newArrayList()));
@@ -357,6 +401,13 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
String taskAttemptIdString = taskAttemptId.toString();
+ if (closed) {
+ LOG.info("Client has already been closed, but received heartbeat from " + taskAttemptIdString);
+ // Set shouldDie response so the LLAP daemon closes this umbilical connection.
+ response.setShouldDie();
+ return response;
+ }
+
updateHeartbeatInfo(taskAttemptIdString);
List<TezEvent> tezEvents = null;
@@ -456,4 +507,36 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
}
+ private static void scheduleClientForCleanup(LlapTaskUmbilicalExternalClient client) {
+ // Add a bit of delay in case the daemon has not closed the umbilical connection yet.
+ clientCleanupExecuter.schedule(new ClientCleanupTask(client), cleanupDelay, TimeUnit.MILLISECONDS);
+ }
+
+ static final ScheduledThreadPoolExecutor clientCleanupExecuter = new ScheduledThreadPoolExecutor(1);
+ static final int cleanupDelay = 2000;
+
+ static class ClientCleanupTask implements Runnable {
+ final LlapTaskUmbilicalExternalClient client;
+
+ public ClientCleanupTask(LlapTaskUmbilicalExternalClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public void run() {
+ if (client.llapTaskUmbilicalServer.getNumOpenConnections() == 0) {
+ // No more outstanding connections, ok to close.
+ try {
+ LOG.debug("Closing client");
+ client.doShutdown();
+ } catch (Exception err) {
+ LOG.error("Error cleaning up client", err);
+ }
+ } else {
+ // Reschedule this task for later.
+ LOG.debug("Client still has umbilical connection - rescheduling cleanup.");
+ scheduleClientForCleanup(client);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4291c467/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
index 470ee6d..403381d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -72,6 +72,10 @@ public class LlapTaskUmbilicalServer {
return this.address;
}
+ public int getNumOpenConnections() {
+ return server.getNumOpenConnections();
+ }
+
public void shutdownServer() {
if (started.get()) { // Primarily to avoid multiple shutdowns.
started.set(false);