You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/17 02:52:40 UTC

[02/50] [abbrv] hive git commit: HIVE-16634. LLAP Use a pool of connections to a single AM from a daemon. (Siddharth Seth, reviewed by Sergey Shelukhin)

HIVE-16634. LLAP Use a pool of connections to a single AM from a daemon.
(Siddharth Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/113a0991
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/113a0991
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/113a0991

Branch: refs/heads/hive-14535
Commit: 113a099125fa0dae25bbb8417582edeeeed5aac3
Parents: 8a7b5b5
Author: Siddharth Seth <ss...@HW10890.local>
Authored: Fri May 12 14:48:46 2017 -0700
Committer: Siddharth Seth <ss...@HW10890.local>
Committed: Fri May 12 14:48:46 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 44 +++++++++++---------
 .../hive/llap/daemon/impl/QueryTracker.java     |  4 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  1 +
 .../daemon/impl/TaskExecutorTestHelpers.java    |  5 ++-
 4 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 088f07c..ce2f457 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -24,9 +24,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Preconditions;
@@ -57,6 +58,8 @@ public class QueryInfo {
   private final FileSystem localFs;
   private String[] localDirs;
   private final LlapNodeId amNodeId;
+  private final String appTokenIdentifier;
+  private final Token<JobTokenIdentifier> appToken;
   // Map of states for different vertices.
 
   private final Set<QueryFragmentInfo> knownFragments =
@@ -66,14 +69,15 @@ public class QueryInfo {
 
   private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
   private final String tokenUserName, appId;
-  private final AtomicReference<UserGroupInformation> umbilicalUgi;
 
   public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
     String dagName, String hiveQueryIdString,
     int dagIdentifier, String user,
     ConcurrentMap<String, SourceStateProto> sourceStateMap,
     String[] localDirsBase, FileSystem localFs, String tokenUserName,
-    String tokenAppId, final LlapNodeId amNodeId) {
+    String tokenAppId, final LlapNodeId amNodeId,
+    String tokenIdentifier,
+    Token<JobTokenIdentifier> appToken) {
     this.queryIdentifier = queryIdentifier;
     this.appIdString = appIdString;
     this.dagIdString = dagIdString;
@@ -86,8 +90,12 @@ public class QueryInfo {
     this.localFs = localFs;
     this.tokenUserName = tokenUserName;
     this.appId = tokenAppId;
-    this.umbilicalUgi = new AtomicReference<>();
     this.amNodeId = amNodeId;
+    this.appTokenIdentifier = tokenIdentifier;
+    this.appToken = appToken;
+    final InetSocketAddress address =
+        NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort());
+    SecurityUtil.setTokenService(appToken, address);
   }
 
   public QueryIdentifier getQueryIdentifier() {
@@ -314,23 +322,21 @@ public class QueryInfo {
     return appId;
   }
 
-  public void setupUmbilicalUgi(String umbilicalUser, Token<JobTokenIdentifier> appToken, String amHost, int amPort) {
-    synchronized (umbilicalUgi) {
-      if (umbilicalUgi.get() == null) {
-        UserGroupInformation taskOwner =
-            UserGroupInformation.createRemoteUser(umbilicalUser);
-        final InetSocketAddress address =
-            NetUtils.createSocketAddrForHost(amHost, amPort);
-        SecurityUtil.setTokenService(appToken, address);
-        taskOwner.addToken(appToken);
-        umbilicalUgi.set(taskOwner);
-      }
-    }
-  }
+
+  private final BlockingQueue<UserGroupInformation> ugiPool = new LinkedBlockingQueue<>();
 
   public UserGroupInformation getUmbilicalUgi() {
-    synchronized (umbilicalUgi) {
-      return umbilicalUgi.get();
+
+    UserGroupInformation ugi;
+    ugi = ugiPool.poll();
+    if (ugi == null) {
+      ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
+      ugi.addToken(appToken);
     }
+    return ugi;
+  }
+
+  public void returnUmbilicalUgi(UserGroupInformation ugi) {
+    ugiPool.offer(ugi);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 7e646c5..daeb555 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -169,13 +169,11 @@ public class QueryTracker extends AbstractService {
             new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString,
                 dagIdentifier, user,
                 getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
-                tokenInfo.userName, tokenInfo.appId, amNodeId);
+                tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken);
         QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         if (old != null) {
           queryInfo = old;
         } else {
-          // Ensure the UGI is setup once.
-          queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort());
           isExistingQueryInfo = false;
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 1669815..7d7fd23 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -274,6 +274,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
           return result;
         } finally {
           FileSystem.closeAllForUGI(fsTaskUgi);
+          fragmentInfo.getQueryInfo().returnUmbilicalUgi(taskOwner);
           LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
                   runtimeWatch.stop().elapsedMillis());
           if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 6287ae8..27c426c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
@@ -89,11 +91,12 @@ public class TaskExecutorTestHelpers {
 
   public static QueryInfo createQueryInfo() {
     QueryIdentifier queryIdentifier = new QueryIdentifier("fake_app_id_string", 1);
+    LlapNodeId nodeId = LlapNodeId.getInstance("localhost", 0);
     QueryInfo queryInfo =
         new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
             "fakeHiveQueryId", 1, "fakeUser",
             new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
-            new String[0], null, "fakeUser", null, null);
+            new String[0], null, "fakeUser", null, nodeId, null, null);
     return queryInfo;
   }