You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/02/28 03:32:32 UTC

hive git commit: HIVE-15958: LLAP: IPC connections are not being reused for umbilical protocol (Prasanth Jayachandran reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master ec4036bb6 -> 2869eca25


HIVE-15958: LLAP: IPC connections are not being reused for umbilical protocol (Prasanth Jayachandran reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2869eca2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2869eca2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2869eca2

Branch: refs/heads/master
Commit: 2869eca251ae7bfe6c0ef1c7cf2fc2647ed98f53
Parents: ec4036b
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Feb 27 19:32:23 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Feb 27 19:32:23 2017 -0800

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/AMReporter.java       | 77 ++++++++++++--------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 21 ++++--
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 17 +++--
 .../hive/llap/daemon/impl/QueryTracker.java     | 20 ++---
 .../daemon/impl/TaskExecutorTestHelpers.java    |  2 +-
 5 files changed, 82 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index a30f8b9..65f7232 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -16,7 +16,6 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
 
-import org.apache.hadoop.io.ArrayWritable;
 
 import java.util.ArrayList;
 
@@ -26,7 +25,6 @@ import java.util.HashSet;
 
 import java.util.Set;
 
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.net.SocketFactory;
 
@@ -44,7 +42,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -127,9 +124,9 @@ public class AMReporter extends AbstractService {
     this.conf = conf;
     this.daemonId = daemonId;
     if (maxThreads < numExecutors) {
-      maxThreads = numExecutors;
       LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors",
-          maxThreads, numExecutors);
+        maxThreads, numExecutors);
+      maxThreads = numExecutors;
     }
     ExecutorService rawExecutor =
         new ThreadPoolExecutor(numExecutors, maxThreads,
@@ -241,12 +238,15 @@ public class AMReporter extends AbstractService {
 
   public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken,
                          final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) {
-    // Not re-using the connection for the AM heartbeat - which may or may not be open by this point.
-    // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection.
     LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
-    AMNodeInfo amNodeInfo =
-        new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
-            conf);
+    AMNodeInfo amNodeInfo;
+    synchronized (knownAppMasters) {
+      amNodeInfo = knownAppMasters.get(amNodeId);
+      if (amNodeInfo == null) {
+        amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
+          conf);
+      }
+    }
 
     // Even if the service hasn't started up. It's OK to make this invocation since this will
     // only happen after the AtomicReference address has been populated. Not adding an additional check.
@@ -266,6 +266,20 @@ public class AMReporter extends AbstractService {
     });
   }
 
+  public void queryComplete(LlapNodeId llapNodeId) {
+    if (llapNodeId != null) {
+      synchronized (knownAppMasters) {
+        AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId);
+        // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete
+        // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for
+        // it to be closed after max idle timeout (10s default)
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Query complete received. Removed {}.", amNodeInfo);
+        }
+      }
+    }
+  }
+
   private class QueueLookupCallable extends CallableWithNdc<Void> {
 
     @Override
@@ -273,7 +287,7 @@ public class AMReporter extends AbstractService {
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
-          if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) {
+          if (amNodeInfo.hasAmFailed()) {
             synchronized (knownAppMasters) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug(
@@ -283,28 +297,29 @@ public class AMReporter extends AbstractService {
               }
               knownAppMasters.remove(amNodeInfo.amNodeId);
             }
-            amNodeInfo.stopUmbilical();
           } else {
-            // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
-            long next = System.currentTimeMillis() + heartbeatInterval;
-            amNodeInfo.setNextHeartbeatTime(next);
-            pendingHeartbeatQueeu.add(amNodeInfo);
-            ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
-            Futures.addCallback(future, new FutureCallback<Void>() {
-              @Override
-              public void onSuccess(Void result) {
-                // Nothing to do.
-              }
-
-              @Override
-              public void onFailure(Throwable t) {
-                QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
-                amNodeInfo.setAmFailed(true);
-                LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
+            if (amNodeInfo.getTaskCount() > 0) {
+              // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
+              long next = System.currentTimeMillis() + heartbeatInterval;
+              amNodeInfo.setNextHeartbeatTime(next);
+              pendingHeartbeatQueeu.add(amNodeInfo);
+              ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
+              Futures.addCallback(future, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                  // Nothing to do.
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                  QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
+                  amNodeInfo.setAmFailed(true);
+                  LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
                     amNodeInfo.amNodeId, currentQueryIdentifier, t);
-                queryFailedHandler.queryFailed(currentQueryIdentifier);
-              }
-            });
+                  queryFailedHandler.queryFailed(currentQueryIdentifier);
+                }
+              });
+            }
           }
         } catch (InterruptedException e) {
           if (isShutdown.get()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index fac9730..af8f5b0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.UgiFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.NotTezEventHelper;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -242,12 +243,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
       Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
+      LlapNodeId amNodeId = LlapNodeId.getInstance(request.getAmHost(), request.getAmPort());
       QueryFragmentInfo fragmentInfo = queryTracker.registerFragment(
           queryIdentifier, qIdProto.getApplicationIdString(), dagId,
           vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
           vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
-          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(),
-          request.getAmPort());
+          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId);
 
       String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
@@ -390,14 +391,18 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
         new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(),
             request.getQueryIdentifier().getDagIndex());
     LOG.info("Processing queryComplete notification for {}", queryIdentifier);
-    List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
-        queryIdentifier, request.getDeleteDelay(), false);
-    LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
+    QueryInfo queryInfo = queryTracker.queryComplete(queryIdentifier, request.getDeleteDelay(), false);
+    if (queryInfo != null) {
+      List<QueryFragmentInfo> knownFragments = queryInfo.getRegisteredFragments();
+      LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
         knownFragments.size());
-    for (QueryFragmentInfo fragmentInfo : knownFragments) {
-      LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
+      for (QueryFragmentInfo fragmentInfo : knownFragments) {
+        LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
           fragmentInfo.getFragmentIdentifierString());
-      executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+        executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+      }
+      LlapNodeId amNodeId = queryInfo.getAmNodeId();
+      amReporter.queryComplete(amNodeId);
     }
     return QueryCompleteResponseProto.getDefaultInstance();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index eaa3e7e..088f07c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
@@ -55,6 +56,7 @@ public class QueryInfo {
   private final String[] localDirsBase;
   private final FileSystem localFs;
   private String[] localDirs;
+  private final LlapNodeId amNodeId;
   // Map of states for different vertices.
 
   private final Set<QueryFragmentInfo> knownFragments =
@@ -67,11 +69,11 @@ public class QueryInfo {
   private final AtomicReference<UserGroupInformation> umbilicalUgi;
 
   public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
-                   String dagName, String hiveQueryIdString,
-                   int dagIdentifier, String user,
-                   ConcurrentMap<String, SourceStateProto> sourceStateMap,
-                   String[] localDirsBase, FileSystem localFs, String tokenUserName,
-                   String tokenAppId) {
+    String dagName, String hiveQueryIdString,
+    int dagIdentifier, String user,
+    ConcurrentMap<String, SourceStateProto> sourceStateMap,
+    String[] localDirsBase, FileSystem localFs, String tokenUserName,
+    String tokenAppId, final LlapNodeId amNodeId) {
     this.queryIdentifier = queryIdentifier;
     this.appIdString = appIdString;
     this.dagIdString = dagIdString;
@@ -85,6 +87,7 @@ public class QueryInfo {
     this.tokenUserName = tokenUserName;
     this.appId = tokenAppId;
     this.umbilicalUgi = new AtomicReference<>();
+    this.amNodeId = amNodeId;
   }
 
   public QueryIdentifier getQueryIdentifier() {
@@ -115,6 +118,10 @@ public class QueryInfo {
     return sourceStateMap;
   }
 
+  public LlapNodeId getAmNodeId() {
+    return amNodeId;
+  }
+
   public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber,
       int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) {
     QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(

http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 5cf3a38..7e646c5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
 import org.apache.hadoop.hive.llap.log.LogHelpers;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -69,8 +70,6 @@ public class QueryTracker extends AbstractService {
 
   private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
 
-
-
   private final String[] localDirsBase;
   private final FileSystem localFs;
   private final String clusterId;
@@ -137,9 +136,10 @@ public class QueryTracker extends AbstractService {
    * Register a new fragment for a specific query
    */
   QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
-      String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
-      String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
-      String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException {
+    String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber,
+    int attemptNumber,
+    String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
+    String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException {
 
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     // Note: This is a readLock to prevent a race with queryComplete. Operations
@@ -169,13 +169,13 @@ public class QueryTracker extends AbstractService {
             new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString,
                 dagIdentifier, user,
                 getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
-                tokenInfo.userName, tokenInfo.appId);
+                tokenInfo.userName, tokenInfo.appId, amNodeId);
         QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         if (old != null) {
           queryInfo = old;
         } else {
           // Ensure the UGI is setup once.
-          queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort);
+          queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort());
           isExistingQueryInfo = false;
         }
       }
@@ -238,7 +238,7 @@ public class QueryTracker extends AbstractService {
    * @param queryIdentifier
    * @param deleteDelay
    */
-  List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
+  QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
       boolean isInternal) throws IOException {
     if (deleteDelay == -1) {
       deleteDelay = defaultDeleteDelaySeconds;
@@ -255,7 +255,7 @@ public class QueryTracker extends AbstractService {
       if (queryInfo == null) {
         // Should not happen.
         LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
-        return Collections.emptyList();
+        return null;
       }
       String[] localDirs = queryInfo.getLocalDirsNoCreate();
       if (localDirs != null) {
@@ -292,7 +292,7 @@ public class QueryTracker extends AbstractService {
       if (savedQueryId != null) {
         ObjectCacheFactory.removeLlapQueryCache(savedQueryId);
       }
-      return queryInfo.getRegisteredFragments();
+      return queryInfo;
     } finally {
       dagLock.writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2869eca2/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 71feb33..259e383 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -93,7 +93,7 @@ public class TaskExecutorTestHelpers {
         new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
             "fakeHiveQueryId", 1, "fakeUser",
             new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
-            new String[0], null, "fakeUser", null);
+            new String[0], null, "fakeUser", null, null);
     return queryInfo;
   }