You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/17 02:52:43 UTC

[05/50] [abbrv] hive git commit: HIVE-16652: LlapInputFormat: Seeing "output error" WARN message (Jason Dere, reviewed by Siddharth Seth)

HIVE-16652: LlapInputFormat: Seeing "output error" WARN message (Jason Dere, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4291c467
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4291c467
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4291c467

Branch: refs/heads/hive-14535
Commit: 4291c467aac81bcef140f1b8b8cdaba6edaf2f96
Parents: fea6df6
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri May 12 16:52:31 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri May 12 16:52:31 2017 -0700

----------------------------------------------------------------------
 .../ext/LlapTaskUmbilicalExternalClient.java    | 89 +++++++++++++++++++-
 .../helpers/LlapTaskUmbilicalServer.java        |  4 +
 2 files changed, 90 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4291c467/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 7d0d6d2..c7de417 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -47,6 +48,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableV
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
@@ -85,15 +88,18 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
   private LlapTaskUmbilicalExternalResponder responder = null;
   private final ScheduledThreadPoolExecutor timer;
   private final long connectionTimeout;
+  private volatile boolean closed = false;
 
   private static class TaskHeartbeatInfo {
+    final QueryIdentifierProto queryIdentifierProto;
     final String taskAttemptId;
     final String hostname;
     String uniqueNodeId;
     final int port;
     final AtomicLong lastHeartbeat = new AtomicLong();
 
-    public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
+    public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskAttemptId, String hostname, int port) {
+      this.queryIdentifierProto = queryIdentifierProto;
       this.taskAttemptId = taskAttemptId;
       this.hostname = hostname;
       this.port = port;
@@ -137,7 +143,45 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
   }
 
   @Override
-  public void serviceStop() {
+  public void serviceStop() throws Exception {
+    if (closed) {
+      throw new IllegalStateException("Client has already been closed");
+    }
+    closed = true;
+
+    // Check if the request is registered - if so we can cancel the request
+    for (Map.Entry<String, TaskHeartbeatInfo> taskEntry : registeredTasks.entrySet()) {
+      terminateRequest(taskEntry.getValue());
+    }
+    registeredTasks.clear();
+
+    scheduleClientForCleanup(this);
+  }
+
+  private void terminateRequest(TaskHeartbeatInfo thi) {
+    TerminateFragmentRequestProto.Builder builder = TerminateFragmentRequestProto.newBuilder();
+    builder.setQueryIdentifier(thi.queryIdentifierProto);
+    builder.setFragmentIdentifierString(thi.taskAttemptId);
+
+    final String taskAttemptId = thi.taskAttemptId;
+    communicator.sendTerminateFragment(builder.build(), thi.hostname, thi.port,
+        new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
+
+      @Override
+      public void setResponse(TerminateFragmentResponseProto response) {
+        LOG.debug("Received terminate response for " + taskAttemptId);
+      }
+
+      @Override
+      public void indicateError(Throwable t) {
+        String msg = "Failed to terminate " + taskAttemptId;
+        LOG.error(msg, t);
+        // Don't propagate the error - termination was done as part of closing the client.
+      }
+    });
+  }
+
+  private void doShutdown() throws IOException {
     llapTaskUmbilicalServer.shutdownServer();
     timer.shutdown();
     if (this.communicator != null) {
@@ -170,7 +214,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
         vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
     final String fragmentId = attemptId.toString();
 
-    final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(fragmentId, llapHost, llapPort);
+    final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort);
     pendingEvents.putIfAbsent(
         fragmentId, new PendingEventData(thi, Lists.<TezEvent>newArrayList()));
 
@@ -357,6 +401,13 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
       TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
       String taskAttemptIdString = taskAttemptId.toString();
 
+      if (closed) {
+        LOG.info("Client has already been closed, but received heartbeat from " + taskAttemptIdString);
+        // Set shouldDie response so the LLAP daemon closes this umbilical connection.
+        response.setShouldDie();
+        return response;
+      }
+
       updateHeartbeatInfo(taskAttemptIdString);
 
       List<TezEvent> tezEvents = null;
@@ -456,4 +507,36 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     }
   }
 
+  private static void scheduleClientForCleanup(LlapTaskUmbilicalExternalClient client) {
+    // Add a bit of delay in case the daemon has not closed the umbilical connection yet.
+    clientCleanupExecuter.schedule(new ClientCleanupTask(client), cleanupDelay, TimeUnit.MILLISECONDS);
+  }
+
+  static final ScheduledThreadPoolExecutor clientCleanupExecuter = new ScheduledThreadPoolExecutor(1);
+  static final int cleanupDelay = 2000;
+
+  static class ClientCleanupTask implements Runnable {
+    final LlapTaskUmbilicalExternalClient client;
+
+    public ClientCleanupTask(LlapTaskUmbilicalExternalClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public void run() {
+      if (client.llapTaskUmbilicalServer.getNumOpenConnections() == 0) {
+        // No more outstanding connections, ok to close.
+        try {
+          LOG.debug("Closing client");
+          client.doShutdown();
+        } catch (Exception err) {
+          LOG.error("Error cleaning up client", err);
+        }
+      } else {
+        // Reschedule this task for later.
+        LOG.debug("Client still has umbilical connection - rescheduling cleanup.");
+        scheduleClientForCleanup(client);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4291c467/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
index 470ee6d..403381d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -72,6 +72,10 @@ public class LlapTaskUmbilicalServer {
     return this.address;
   }
 
+  public int getNumOpenConnections() {
+    return server.getNumOpenConnections();
+  }
+
   public void shutdownServer() {
     if (started.get()) { // Primarily to avoid multiple shutdowns.
       started.set(false);