You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2017/02/23 22:30:28 UTC
hive git commit: HIVE-16020: LLAP : Reduce IPC connection misses
(Rajesh Balamohan, Siddharth Seth, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 657236ee0 -> b8d7192f5
HIVE-16020: LLAP : Reduce IPC connection misses (Rajesh Balamohan, Siddharth Seth, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b8d7192f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b8d7192f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b8d7192f
Branch: refs/heads/master
Commit: b8d7192f5f28dbc832d4de3e4afc763523a4bf12
Parents: 657236e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Feb 24 04:00:20 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Feb 24 04:00:20 2017 +0530
----------------------------------------------------------------------
.../hive/llap/daemon/impl/AMReporter.java | 5 ++--
.../llap/daemon/impl/ContainerRunnerImpl.java | 16 ++++++----
.../hive/llap/daemon/impl/LlapDaemon.java | 8 +++--
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 29 ++++++++++++++++++
.../hive/llap/daemon/impl/QueryTracker.java | 4 ++-
.../llap/daemon/impl/TaskRunnerCallable.java | 31 +++++++++++---------
.../daemon/impl/TaskExecutorTestHelpers.java | 4 ++-
7 files changed, 72 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 93237e6..a30f8b9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -119,7 +119,8 @@ public class AMReporter extends AbstractService {
private final DaemonId daemonId;
public AMReporter(int numExecutors, int maxThreads, AtomicReference<InetSocketAddress>
- localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) {
+ localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId,
+ SocketFactory socketFactory) {
super(AMReporter.class.getName());
this.localAddress = localAddress;
this.queryFailedHandler = queryFailedHandler;
@@ -151,7 +152,7 @@ public class AMReporter extends AbstractService {
.retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep,
TimeUnit.MILLISECONDS);
- this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ this.socketFactory = socketFactory;
LOG.info("Setting up AMReporter with " +
"heartbeatInterval(ms)=" + heartbeatInterval +
http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6908138..cc4eff0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -83,6 +83,8 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import javax.net.SocketFactory;
+
public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
// TODO Setup a set of threads to process incoming requests.
@@ -107,12 +109,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
private final String clusterId;
private final DaemonId daemonId;
private final UgiFactory fsUgiFactory;
+ private final SocketFactory socketFactory;
public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort,
AtomicReference<InetSocketAddress> localAddress,
long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
- AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory) {
+ AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory,
+ SocketFactory socketFactory) {
super("ContainerRunnerImpl");
Preconditions.checkState(numExecutors > 0,
"Invalid number of executors: " + numExecutors + ". Must be > 0");
@@ -122,6 +126,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.signer = UserGroupInformation.isSecurityEnabled()
? new LlapSignerImpl(conf, daemonId.getClusterString()) : null;
this.fsUgiFactory = fsUgiFactory;
+ this.socketFactory = socketFactory;
this.clusterId = daemonId.getClusterString();
this.daemonId = daemonId;
@@ -239,7 +244,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
queryIdentifier, qIdProto.getApplicationIdString(), dagId,
vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
- vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo);
+ vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(),
+ request.getAmPort());
String[] localDirs = fragmentInfo.getLocalDirs();
Preconditions.checkNotNull(localDirs);
@@ -250,12 +256,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
// Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
Configuration callableConf = new Configuration(getConfig());
- UserGroupInformation taskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi();
+ UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi();
TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
new ExecutionContextImpl(localAddress.get().getHostName()), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
- this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi,
- completionListener);
+ this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi,
+ completionListener, socketFactory);
submissionState = executorService.schedule(callable);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index fc9f530..eb05f4c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.ObjectName;
+import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JvmPauseMonitor;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
@@ -105,6 +107,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
private final long maxJvmMemory;
private final String[] localDirs;
private final DaemonId daemonId;
+ private final SocketFactory socketFactory;
// TODO Not the best way to share the address
private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
@@ -255,8 +258,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
" sessionId: " + sessionId);
int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS);
+ this.socketFactory = NetUtils.getDefaultSocketFactory(daemonConf);
this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress,
- new QueryFailedHandlerProxy(), daemonConf, daemonId);
+ new QueryFailedHandlerProxy(), daemonConf, daemonId, socketFactory);
SecretManager sm = null;
if (UserGroupInformation.isSecurityEnabled()) {
@@ -274,7 +278,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
}
this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize,
enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryPerInstance, metrics,
- amReporter, executorClassLoader, daemonId, fsUgiFactory);
+ amReporter, executorClassLoader, daemonId, fsUgiFactory, socketFactory);
addIfService(containerRunner);
// Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 1080d3e..eaa3e7e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -25,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
@@ -36,6 +38,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
public class QueryInfo {
private final QueryIdentifier queryIdentifier;
@@ -57,6 +64,7 @@ public class QueryInfo {
private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
private final String tokenUserName, appId;
+ private final AtomicReference<UserGroupInformation> umbilicalUgi;
public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
String dagName, String hiveQueryIdString,
@@ -76,6 +84,7 @@ public class QueryInfo {
this.localFs = localFs;
this.tokenUserName = tokenUserName;
this.appId = tokenAppId;
+ this.umbilicalUgi = new AtomicReference<>();
}
public QueryIdentifier getQueryIdentifier() {
@@ -297,4 +306,24 @@ public class QueryInfo {
public String getTokenAppId() {
return appId;
}
+
+ public void setupUmbilicalUgi(String umbilicalUser, Token<JobTokenIdentifier> appToken, String amHost, int amPort) {
+ synchronized (umbilicalUgi) {
+ if (umbilicalUgi.get() == null) {
+ UserGroupInformation taskOwner =
+ UserGroupInformation.createRemoteUser(umbilicalUser);
+ final InetSocketAddress address =
+ NetUtils.createSocketAddrForHost(amHost, amPort);
+ SecurityUtil.setTokenService(appToken, address);
+ taskOwner.addToken(appToken);
+ umbilicalUgi.set(taskOwner);
+ }
+ }
+ }
+
+ public UserGroupInformation getUmbilicalUgi() {
+ synchronized (umbilicalUgi) {
+ return umbilicalUgi.get();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 9eaddd2..5cf3a38 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -139,7 +139,7 @@ public class QueryTracker extends AbstractService {
QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
- String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
+ String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException {
ReadWriteLock dagLock = getDagLock(queryIdentifier);
// Note: This is a readLock to prevent a race with queryComplete. Operations
@@ -174,6 +174,8 @@ public class QueryTracker extends AbstractService {
if (old != null) {
queryInfo = old;
} else {
+ // Ensure the UGI is setup once.
+ queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort);
isExistingQueryInfo = false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 4b677aa..8fce546 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.MDC;
@@ -65,6 +64,7 @@ import org.apache.tez.runtime.task.TezTaskRunner2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.SocketFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@@ -116,7 +116,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final SignableVertexSpec vertex;
private final TezEvent initialEvent;
private final SchedulerFragmentCompletingListener completionListener;
- private UserGroupInformation taskUgi;
+ private UserGroupInformation fsTaskUgi;
+ private final SocketFactory socketFactory;
@VisibleForTesting
public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
@@ -125,7 +126,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
- UserGroupInformation taskUgi, SchedulerFragmentCompletingListener completionListener) {
+ UserGroupInformation fsTaskUgi, SchedulerFragmentCompletingListener completionListener,
+ SocketFactory socketFactory) {
this.request = request;
this.fragmentInfo = fragmentInfo;
this.conf = conf;
@@ -153,8 +155,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.fragmentCompletionHanler = fragmentCompleteHandler;
this.tezHadoopShim = tezHadoopShim;
this.initialEvent = initialEvent;
- this.taskUgi = taskUgi;
+ this.fsTaskUgi = fsTaskUgi;
this.completionListener = completionListener;
+ this.socketFactory = socketFactory;
}
public long getStartTime() {
@@ -196,27 +199,27 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
// TODO Consolidate this code with TezChild.
runtimeWatch.start();
- if (taskUgi == null) {
- taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
+ if (fsTaskUgi == null) {
+ fsTaskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
}
- taskUgi.addCredentials(credentials);
+ fsTaskUgi.addCredentials(credentials);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
TezCommonUtils.convertJobTokenToBytes(jobToken));
Multimap<String, String> startedInputsMap = createStartedInputMap(vertex);
- UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
+ final UserGroupInformation taskOwner = fragmentInfo.getQueryInfo().getUmbilicalUgi();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("taskOwner hashCode:" + taskOwner.hashCode());
+ }
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
- SecurityUtil.setTokenService(jobToken, address);
- taskOwner.addToken(jobToken);
umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
@Override
public LlapTaskUmbilicalProtocol run() throws Exception {
return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
- LlapTaskUmbilicalProtocol.versionID, address, conf);
+ LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory);
}
});
@@ -238,7 +241,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
try {
synchronized (this) {
if (shouldRunTask) {
- taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
+ taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(),
taskSpec,
vertex.getQueryIdentifier().getAppAttemptNumber(),
serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
@@ -260,7 +263,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
isCompleted.set(true);
return result;
} finally {
- FileSystem.closeAllForUGI(taskUgi);
+ FileSystem.closeAllForUGI(fsTaskUgi);
LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
runtimeWatch.stop().elapsedMillis());
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 5dc1be5..ae3328a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -44,6 +44,8 @@ import org.apache.tez.runtime.task.TaskRunner2Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.SocketFactory;
+
public class TaskExecutorTestHelpers {
private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class);
@@ -184,7 +186,7 @@ public class TaskExecutorTestHelpers {
mock(KilledTaskHandler.class), mock(
FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
requestProto.getWorkSpec().getVertex(), initialEvent, null, mock(
- SchedulerFragmentCompletingListener.class));
+ SchedulerFragmentCompletingListener.class), mock(SocketFactory.class));
this.workTime = workTime;
this.canFinish = canFinish;
}