You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/17 03:11:12 UTC
[hive] branch master updated: HIVE-23446 : LLAP: Reduce IPC
connection misses to AM for short queries (Rajesh Balamohan via Ashutosh
Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new eeffb0e HIVE-23446 : LLAP: Reduce IPC connection misses to AM for short queries (Rajesh Balamohan via Ashutosh Chauhan)
eeffb0e is described below
commit eeffb0e4e7feab7cea0dba9e7a2b63808b2023f7
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Sat May 16 20:05:27 2020 -0700
HIVE-23446 : LLAP: Reduce IPC connection misses to AM for short queries (Rajesh Balamohan via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hive/llap/daemon/impl/ContainerRunnerImpl.java | 82 +++++++++++++++++++++-
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 25 +++----
.../hadoop/hive/llap/daemon/impl/QueryTracker.java | 5 +-
.../llap/daemon/impl/TaskExecutorTestHelpers.java | 2 +-
4 files changed, 92 insertions(+), 22 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 9c73747..a4de3d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -15,7 +15,6 @@
package org.apache.hadoop.hive.llap.daemon.impl;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -23,10 +22,19 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.UgiFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -103,6 +111,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class);
public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
+ private UgiPool ugiPool;
private final AMReporter amReporter;
private final QueryTracker queryTracker;
private final Scheduler<TaskRunnerCallable> executorService;
@@ -131,6 +140,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
super("ContainerRunnerImpl");
Preconditions.checkState(numExecutors > 0,
"Invalid number of executors: " + numExecutors + ". Must be > 0");
+ this.ugiPool = new UgiPool(numExecutors);
this.localAddress = localAddress;
this.localShufflePort = localShufflePort;
this.amReporter = amReporter;
@@ -270,7 +280,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
queryIdentifier, qIdProto.getApplicationIdString(), dagId,
vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
- vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId);
+ vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId, ugiPool);
// May need to setup localDir for re-localization, which is usually setup as Environment.PWD.
// Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
@@ -593,4 +603,72 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
return executorService.getNumActiveForReporting();
}
+ static class UgiPool {
+ // Pool of UGI for a given appTokenIdentifier (AM). Expires after 3 hours of last access
+ private final Cache<String, BlockingQueue<UserGroupInformation>> ugiPool =
+ CacheBuilder
+ .newBuilder().removalListener(new RemovalListener<String, BlockingQueue<UserGroupInformation>>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<String, BlockingQueue<UserGroupInformation>> notification) {
+ LOG.debug("Removing " + notification.getValue() + " from pool.Pool size: " + ugiPool.size());
+ }
+ }).expireAfterAccess(60 * 3, TimeUnit.MINUTES).build();
+
+ private final int numExecutors;
+
+ public UgiPool(int numExecutors) {
+ this.numExecutors = numExecutors;
+ }
+
+ /**
+ * Get UGI for a given AM and appToken. It is possible to have more than one
+ * UGI per AM.
+ *
+ * @param appTokenIdentifier
+ * @param appToken
+ * @return UserGroupInformation
+ * @throws ExecutionException
+ */
+ public UserGroupInformation getUmbilicalUgi(String appTokenIdentifier,
+ Token<JobTokenIdentifier> appToken) throws ExecutionException {
+ BlockingQueue<UserGroupInformation> queue = ugiPool.get(appTokenIdentifier,
+ new Callable<BlockingQueue<UserGroupInformation>>() {
+ @Override
+ public BlockingQueue<UserGroupInformation> call() throws Exception {
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
+ ugi.addToken(appToken);
+ BlockingQueue<UserGroupInformation> queue = new LinkedBlockingQueue<>(numExecutors);
+ queue.add(ugi);
+ LOG.debug("Added new ugi pool for " + appTokenIdentifier + ", Pool Size: ");
+ return queue;
+ }
+ });
+
+ //Get UGI from the queue. Possible to maintain more than one UGI per AM. Ref: HIVE-16634
+ UserGroupInformation ugi = queue.poll();
+ if (ugi == null) {
+ ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
+ ugi.addToken(appToken);
+ queue.offer(ugi);
+ LOG.info("Added new ugi for " + appTokenIdentifier + ". Pool size:" + ugiPool.size());
+ }
+ return ugi;
+ }
+
+ /**
+ * Return UGI back to pool
+ *
+ * @param appTokenIdentifier AM identifier
+ * @param ugi
+ */
+ public void returnUmbilicalUgi(String appTokenIdentifier, UserGroupInformation ugi) {
+ BlockingQueue<UserGroupInformation> ugiQueue = ugiPool.getIfPresent(appTokenIdentifier);
+ // Entry could have been removed due to expiry. Check before returning back to queue
+ if (ugiQueue != null) {
+ ugiQueue.offer(ugi);
+ }
+ }
+ }
+
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 00fed15..c464c2f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -24,10 +24,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
@@ -70,6 +69,7 @@ public class QueryInfo {
private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
private final String tokenUserName, appId;
+ private final ContainerRunnerImpl.UgiPool ugiPool;
public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
String dagName, String hiveQueryIdString,
@@ -79,7 +79,7 @@ public class QueryInfo {
String tokenAppId, final LlapNodeId amNodeId,
String tokenIdentifier,
Token<JobTokenIdentifier> appToken,
- boolean isExternalQuery) {
+ boolean isExternalQuery, ContainerRunnerImpl.UgiPool ugiPool) {
this.queryIdentifier = queryIdentifier;
this.appIdString = appIdString;
this.dagIdString = dagIdString;
@@ -96,6 +96,7 @@ public class QueryInfo {
this.appTokenIdentifier = tokenIdentifier;
this.appToken = appToken;
this.isExternalQuery = isExternalQuery;
+ this.ugiPool = ugiPool;
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort());
SecurityUtil.setTokenService(appToken, address);
@@ -332,21 +333,11 @@ public class QueryInfo {
return appId;
}
-
- private final BlockingQueue<UserGroupInformation> ugiPool = new LinkedBlockingQueue<>();
-
- public UserGroupInformation getUmbilicalUgi() {
-
- UserGroupInformation ugi;
- ugi = ugiPool.poll();
- if (ugi == null) {
- ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier);
- ugi.addToken(appToken);
- }
- return ugi;
+ UserGroupInformation getUmbilicalUgi() throws ExecutionException {
+ return ugiPool.getUmbilicalUgi(appTokenIdentifier, appToken);
}
- public void returnUmbilicalUgi(UserGroupInformation ugi) {
- ugiPool.offer(ugi);
+ void returnUmbilicalUgi(UserGroupInformation ugi) {
+ ugiPool.returnUmbilicalUgi(appTokenIdentifier, ugi);
}
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index bf4eea0..40e921f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -160,7 +160,8 @@ public class QueryTracker extends AbstractService {
String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber,
int attemptNumber,
String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
- String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException {
+ String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId,
+ ContainerRunnerImpl.UgiPool ugiPool) throws IOException {
ReadWriteLock dagLock = getDagLock(queryIdentifier);
// Note: This is a readLock to prevent a race with queryComplete. Operations
@@ -191,7 +192,7 @@ public class QueryTracker extends AbstractService {
dagIdentifier, user,
getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken,
- vertex.getIsExternalSubmission());
+ vertex.getIsExternalSubmission(), ugiPool);
QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
if (old != null) {
queryInfo = old;
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index af3f292..1e95909 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -103,7 +103,7 @@ public class TaskExecutorTestHelpers {
new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
"fakeHiveQueryId", 1, "fakeUser",
new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
- new String[0], null, "fakeUser", null, nodeId, null, null, false);
+ new String[0], null, "fakeUser", null, nodeId, null, null, false, null);
return queryInfo;
}