You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/16 16:13:31 UTC
[hive] branch master updated: HIVE-23449 : LLAP: Reduce mkdir and
config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh
Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 781b7fc HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan)
781b7fc is described below
commit 781b7fc3e450f5a15e1afa2096189884b772b115
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Sat May 16 09:12:49 2020 -0700
HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java | 13 +++++--------
.../apache/hadoop/hive/llap/daemon/impl/QueryTracker.java | 4 ++--
.../hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java | 13 ++++++++-----
.../hadoop/hive/llap/shufflehandler/ShuffleHandler.java | 4 ++++
.../hive/llap/daemon/impl/TaskExecutorTestHelpers.java | 3 ++-
.../hive/llap/daemon/impl/TestContainerRunnerImpl.java | 6 ++++--
6 files changed, 25 insertions(+), 18 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6a13b55..9c73747 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.UgiFactory;
@@ -271,23 +272,19 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId);
- String[] localDirs = fragmentInfo.getLocalDirs();
- Preconditions.checkNotNull(localDirs);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Dirs are: " + Arrays.toString(localDirs));
- }
// May need to setup localDir for re-localization, which is usually setup as Environment.PWD.
// Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
- Configuration callableConf = new Configuration(getConfig());
+ // Lazy create conf object, as it gets expensive in this codepath.
+ Supplier<Configuration> callableConf = () -> new Configuration(getConfig());
UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi();
boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed();
// enable the printing of (per daemon) LLAP task queue/run times via LLAP_TASK_TIME_SUMMARY
ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY;
ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY;
- boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal)
- && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal);
+ boolean addTaskTimes = getConfig().getBoolean(tezSummary.varname, tezSummary.defaultBoolVal)
+ && getConfig().getBoolean(llapTasks.varname, llapTasks.defaultBoolVal);
final String llapHost;
if (UserGroupInformation.isSecurityEnabled()) {
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index eae8e08..bf4eea0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -211,9 +211,9 @@ public class QueryTracker extends AbstractService {
LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
}
if (!vertex.getIsExternalSubmission()) {
+ String[] localDirs = (ShuffleHandler.get().isDirWatcherEnabled()) ? queryInfo.getLocalDirs() : null;
ShuffleHandler.get()
- .registerDag(appIdString, dagIdentifier, appToken,
- user, queryInfo.getLocalDirs());
+ .registerDag(appIdString, dagIdentifier, appToken, user, localDirs);
}
return queryInfo.registerFragment(
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 3619252..bc26dc0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -86,6 +86,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
/**
*
@@ -93,7 +94,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class);
private final SubmitWorkRequestProto request;
- private final Configuration conf;
+ private final Supplier<Configuration> conf;
private final Map<String, String> envMap;
private final String pid = null;
private final ObjectRegistryImpl objectRegistry;
@@ -135,8 +136,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@VisibleForTesting
public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
- Configuration conf, ExecutionContext executionContext, Map<String, String> envMap,
- Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams,
+ Supplier<Configuration> conf, ExecutionContext executionContext,
+ Map<String, String> envMap, Credentials credentials, long memoryAvailable,
+ AMReporter amReporter, ConfParams confParams,
LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
@@ -192,6 +194,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
setMDCFromNDC();
try {
+ final Configuration config = conf.get();
isStarted.set(true);
this.startTime = System.currentTimeMillis();
threadName = Thread.currentThread().getName();
@@ -254,7 +257,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public LlapTaskUmbilicalProtocol run() throws Exception {
return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
- LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory);
+ LlapTaskUmbilicalProtocol.versionID, address, taskOwner, config, socketFactory);
}
});
@@ -277,7 +280,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
try {
synchronized (this) {
if (shouldRunTask) {
- taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(),
+ taskRunner = new TezTaskRunner2(config, fsTaskUgi, fragmentInfo.getLocalDirs(),
taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(),
serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
objectRegistry, pid, executionContext, memoryAvailable, false, tezHadoopShim);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index aff2c2e..9294fb3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -431,6 +431,10 @@ public class ShuffleHandler implements AttemptRegistrationListener {
return port;
}
+ public boolean isDirWatcherEnabled() {
+ return dirWatcher != null;
+ }
+
/**
* Register an application and it's associated credentials and user information.
*
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 50dec47..af3f292 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -212,7 +213,7 @@ public class TaskExecutorTestHelpers {
public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo,
boolean canFinish, boolean canFinishQueue, long workTime,
TezEvent initialEvent, boolean isGuaranteed) {
- super(requestProto, fragmentInfo, new Configuration(), new ExecutionContextImpl("localhost"),
+ super(requestProto, fragmentInfo, Configuration::new, new ExecutionContextImpl("localhost"),
null, new Credentials(), 0, mock(AMReporter.class), null, mock(
LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock(
FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
index 8ae00b9..93ca9f2 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
@@ -174,7 +174,9 @@ public class TestContainerRunnerImpl {
containerRunner.submitWork(sRequest);
Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
- Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
- Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
+ if (ShuffleHandler.get().isDirWatcherEnabled()) {
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
+ }
}
}