You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/17 03:11:12 UTC

[hive] branch master updated: HIVE-23446 : LLAP: Reduce IPC connection misses to AM for short queries (Rajesh Balamohan via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new eeffb0e  HIVE-23446 : LLAP: Reduce IPC connection misses to AM for short queries (Rajesh Balamohan via Ashutosh Chauhan)
eeffb0e is described below

commit eeffb0e4e7feab7cea0dba9e7a2b63808b2023f7
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Sat May 16 20:05:27 2020 -0700

    HIVE-23446 : LLAP: Reduce IPC connection misses to AM for short queries (Rajesh Balamohan via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/llap/daemon/impl/ContainerRunnerImpl.java | 82 +++++++++++++++++++++-
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java    | 25 +++----
 .../hadoop/hive/llap/daemon/impl/QueryTracker.java |  5 +-
 .../llap/daemon/impl/TaskExecutorTestHelpers.java  |  2 +-
 4 files changed, 92 insertions(+), 22 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 9c73747..a4de3d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -15,7 +15,6 @@
 package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -23,10 +22,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.UgiFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -103,6 +111,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class);
   public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
 
+  private UgiPool ugiPool;
   private final AMReporter amReporter;
   private final QueryTracker queryTracker;
   private final Scheduler<TaskRunnerCallable> executorService;
@@ -131,6 +140,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     super("ContainerRunnerImpl");
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
+    this.ugiPool = new UgiPool(numExecutors);
     this.localAddress = localAddress;
     this.localShufflePort = localShufflePort;
     this.amReporter = amReporter;
@@ -270,7 +280,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           queryIdentifier, qIdProto.getApplicationIdString(), dagId,
           vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
           vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
-          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId);
+          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId, ugiPool);
 
       // May need to setup localDir for re-localization, which is usually setup as Environment.PWD.
       // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
@@ -593,4 +603,72 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     return executorService.getNumActiveForReporting();
   }
 
+  static class UgiPool {
+    // Pool of UGI for a given appTokenIdentifier (AM). Expires after 3 hours of last access
+    private final Cache<String, BlockingQueue<UserGroupInformation>> ugiPool =
+        CacheBuilder
+            .newBuilder().removalListener(new RemovalListener<String, BlockingQueue<UserGroupInformation>>() {
+          @Override
+          public void onRemoval(
+              RemovalNotification<String, BlockingQueue<UserGroupInformation>> notification) {
+            LOG.debug("Removing " + notification.getValue()  + " from pool.Pool size: " + ugiPool.size());
+          }
+        }).expireAfterAccess(60 * 3, TimeUnit.MINUTES).build();
+
+    private final int numExecutors;
+
+    public UgiPool(int numExecutors) {
+      this.numExecutors = numExecutors;
+    }
+
+    /**
+     * Get UGI for a given AM and appToken. It is possible to have more than one
+     * UGI per AM.
+     *
+     * @param appTokenIdentifier
+     * @param appToken
+     * @return UserGroupInformation
+     * @throws ExecutionException
+     */
+    public UserGroupInformation getUmbilicalUgi(String appTokenIdentifier,
+        Token<JobTokenIdentifier> appToken) throws ExecutionException {
+      BlockingQueue<UserGroupInformation> queue = ugiPool.get(appTokenIdentifier,
+          new Callable<BlockingQueue<UserGroupInformation>>() {
+            @Override
+            public BlockingQueue<UserGroupInformation> call() throws Exception {
+              UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
+              ugi.addToken(appToken);
+              BlockingQueue<UserGroupInformation> queue = new LinkedBlockingQueue<>(numExecutors);
+              queue.add(ugi);
+              LOG.debug("Added new ugi pool for " + appTokenIdentifier + ", Pool Size: ");
+              return queue;
+            }
+          });
+
+      //Get UGI from the queue. Possible to maintain more than one UGI per AM. Ref: HIVE-16634
+      UserGroupInformation ugi = queue.poll();
+      if (ugi == null) {
+        ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
+        ugi.addToken(appToken);
+        queue.offer(ugi);
+        LOG.info("Added new ugi for " + appTokenIdentifier + ". Pool size:" + ugiPool.size());
+      }
+      return ugi;
+    }
+
+    /**
+     * Return UGI back to pool
+     *
+     * @param appTokenIdentifier AM identifier
+     * @param ugi
+     */
+    public void returnUmbilicalUgi(String appTokenIdentifier, UserGroupInformation ugi) {
+      BlockingQueue<UserGroupInformation> ugiQueue = ugiPool.getIfPresent(appTokenIdentifier);
+      // Entry could have been removed due to expiry. Check before returning back to queue
+      if (ugiQueue != null) {
+        ugiQueue.offer(ugi);
+      }
+    }
+  }
+
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 00fed15..c464c2f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -24,10 +24,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Preconditions;
@@ -70,6 +69,7 @@ public class QueryInfo {
 
   private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
   private final String tokenUserName, appId;
+  private final ContainerRunnerImpl.UgiPool ugiPool;
 
   public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
     String dagName, String hiveQueryIdString,
@@ -79,7 +79,7 @@ public class QueryInfo {
     String tokenAppId, final LlapNodeId amNodeId,
     String tokenIdentifier,
     Token<JobTokenIdentifier> appToken,
-    boolean isExternalQuery) {
+    boolean isExternalQuery, ContainerRunnerImpl.UgiPool ugiPool) {
     this.queryIdentifier = queryIdentifier;
     this.appIdString = appIdString;
     this.dagIdString = dagIdString;
@@ -96,6 +96,7 @@ public class QueryInfo {
     this.appTokenIdentifier = tokenIdentifier;
     this.appToken = appToken;
     this.isExternalQuery = isExternalQuery;
+    this.ugiPool = ugiPool;
     final InetSocketAddress address =
         NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort());
     SecurityUtil.setTokenService(appToken, address);
@@ -332,21 +333,11 @@ public class QueryInfo {
     return appId;
   }
 
-
-  private final BlockingQueue<UserGroupInformation> ugiPool = new LinkedBlockingQueue<>();
-
-  public UserGroupInformation getUmbilicalUgi() {
-
-    UserGroupInformation ugi;
-    ugi = ugiPool.poll();
-    if (ugi == null) {
-      ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
-      ugi.addToken(appToken);
-    }
-    return ugi;
+  UserGroupInformation getUmbilicalUgi() throws ExecutionException {
+    return ugiPool.getUmbilicalUgi(appTokenIdentifier, appToken);
   }
 
-  public void returnUmbilicalUgi(UserGroupInformation ugi) {
-    ugiPool.offer(ugi);
+  void returnUmbilicalUgi(UserGroupInformation ugi) {
+    ugiPool.returnUmbilicalUgi(appTokenIdentifier, ugi);
   }
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index bf4eea0..40e921f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -160,7 +160,8 @@ public class QueryTracker extends AbstractService {
     String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber,
     int attemptNumber,
     String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
-    String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException {
+      String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId,
+      ContainerRunnerImpl.UgiPool ugiPool) throws IOException {
 
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     // Note: This is a readLock to prevent a race with queryComplete. Operations
@@ -191,7 +192,7 @@ public class QueryTracker extends AbstractService {
                 dagIdentifier, user,
                 getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
                 tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken,
-                vertex.getIsExternalSubmission());
+                vertex.getIsExternalSubmission(), ugiPool);
         QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         if (old != null) {
           queryInfo = old;
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index af3f292..1e95909 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -103,7 +103,7 @@ public class TaskExecutorTestHelpers {
         new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
             "fakeHiveQueryId", 1, "fakeUser",
             new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
-            new String[0], null, "fakeUser", null, nodeId, null, null, false);
+            new String[0], null, "fakeUser", null, nodeId, null, null, false, null);
     return queryInfo;
   }