You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/17 02:52:40 UTC
[02/50] [abbrv] hive git commit: HIVE-16634. LLAP Use a pool of
connections to a single AM from a daemon. (Siddharth Seth,
reviewed by Sergey Shelukhin)
HIVE-16634. LLAP Use a pool of connections to a single AM from a daemon.
(Siddharth Seth, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/113a0991
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/113a0991
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/113a0991
Branch: refs/heads/hive-14535
Commit: 113a099125fa0dae25bbb8417582edeeeed5aac3
Parents: 8a7b5b5
Author: Siddharth Seth <ss...@HW10890.local>
Authored: Fri May 12 14:48:46 2017 -0700
Committer: Siddharth Seth <ss...@HW10890.local>
Committed: Fri May 12 14:48:46 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 44 +++++++++++---------
.../hive/llap/daemon/impl/QueryTracker.java | 4 +-
.../llap/daemon/impl/TaskRunnerCallable.java | 1 +
.../daemon/impl/TaskExecutorTestHelpers.java | 5 ++-
4 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 088f07c..ce2f457 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -24,9 +24,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
@@ -57,6 +58,8 @@ public class QueryInfo {
private final FileSystem localFs;
private String[] localDirs;
private final LlapNodeId amNodeId;
+ private final String appTokenIdentifier;
+ private final Token<JobTokenIdentifier> appToken;
// Map of states for different vertices.
private final Set<QueryFragmentInfo> knownFragments =
@@ -66,14 +69,15 @@ public class QueryInfo {
private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
private final String tokenUserName, appId;
- private final AtomicReference<UserGroupInformation> umbilicalUgi;
public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
String dagName, String hiveQueryIdString,
int dagIdentifier, String user,
ConcurrentMap<String, SourceStateProto> sourceStateMap,
String[] localDirsBase, FileSystem localFs, String tokenUserName,
- String tokenAppId, final LlapNodeId amNodeId) {
+ String tokenAppId, final LlapNodeId amNodeId,
+ String tokenIdentifier,
+ Token<JobTokenIdentifier> appToken) {
this.queryIdentifier = queryIdentifier;
this.appIdString = appIdString;
this.dagIdString = dagIdString;
@@ -86,8 +90,12 @@ public class QueryInfo {
this.localFs = localFs;
this.tokenUserName = tokenUserName;
this.appId = tokenAppId;
- this.umbilicalUgi = new AtomicReference<>();
this.amNodeId = amNodeId;
+ this.appTokenIdentifier = tokenIdentifier;
+ this.appToken = appToken;
+ final InetSocketAddress address =
+ NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort());
+ SecurityUtil.setTokenService(appToken, address);
}
public QueryIdentifier getQueryIdentifier() {
@@ -314,23 +322,21 @@ public class QueryInfo {
return appId;
}
- public void setupUmbilicalUgi(String umbilicalUser, Token<JobTokenIdentifier> appToken, String amHost, int amPort) {
- synchronized (umbilicalUgi) {
- if (umbilicalUgi.get() == null) {
- UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(umbilicalUser);
- final InetSocketAddress address =
- NetUtils.createSocketAddrForHost(amHost, amPort);
- SecurityUtil.setTokenService(appToken, address);
- taskOwner.addToken(appToken);
- umbilicalUgi.set(taskOwner);
- }
- }
- }
+
+ private final BlockingQueue<UserGroupInformation> ugiPool = new LinkedBlockingQueue<>();
public UserGroupInformation getUmbilicalUgi() {
- synchronized (umbilicalUgi) {
- return umbilicalUgi.get();
+
+ UserGroupInformation ugi;
+ ugi = ugiPool.poll();
+ if (ugi == null) {
+ ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
+ ugi.addToken(appToken);
}
+ return ugi;
+ }
+
+ public void returnUmbilicalUgi(UserGroupInformation ugi) {
+ ugiPool.offer(ugi);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 7e646c5..daeb555 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -169,13 +169,11 @@ public class QueryTracker extends AbstractService {
new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString,
dagIdentifier, user,
getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
- tokenInfo.userName, tokenInfo.appId, amNodeId);
+ tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken);
QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
if (old != null) {
queryInfo = old;
} else {
- // Ensure the UGI is setup once.
- queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort());
isExistingQueryInfo = false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 1669815..7d7fd23 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -274,6 +274,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
return result;
} finally {
FileSystem.closeAllForUGI(fsTaskUgi);
+ fragmentInfo.getQueryInfo().returnUmbilicalUgi(taskOwner);
LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
runtimeWatch.stop().elapsedMillis());
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/113a0991/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 6287ae8..27c426c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
@@ -89,11 +91,12 @@ public class TaskExecutorTestHelpers {
public static QueryInfo createQueryInfo() {
QueryIdentifier queryIdentifier = new QueryIdentifier("fake_app_id_string", 1);
+ LlapNodeId nodeId = LlapNodeId.getInstance("localhost", 0);
QueryInfo queryInfo =
new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
"fakeHiveQueryId", 1, "fakeUser",
new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
- new String[0], null, "fakeUser", null, null);
+ new String[0], null, "fakeUser", null, nodeId, null, null);
return queryInfo;
}