You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/16 16:13:31 UTC

[hive] branch master updated: HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 781b7fc  HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan)
781b7fc is described below

commit 781b7fc3e450f5a15e1afa2096189884b772b115
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Sat May 16 09:12:49 2020 -0700

    HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java   | 13 +++++--------
 .../apache/hadoop/hive/llap/daemon/impl/QueryTracker.java   |  4 ++--
 .../hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java    | 13 ++++++++-----
 .../hadoop/hive/llap/shufflehandler/ShuffleHandler.java     |  4 ++++
 .../hive/llap/daemon/impl/TaskExecutorTestHelpers.java      |  3 ++-
 .../hive/llap/daemon/impl/TestContainerRunnerImpl.java      |  6 ++++--
 6 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6a13b55..9c73747 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.UgiFactory;
@@ -271,23 +272,19 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
           vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId);
 
-      String[] localDirs = fragmentInfo.getLocalDirs();
-      Preconditions.checkNotNull(localDirs);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Dirs are: " + Arrays.toString(localDirs));
-      }
       // May need to setup localDir for re-localization, which is usually setup as Environment.PWD.
       // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
 
-      Configuration callableConf = new Configuration(getConfig());
+      // Lazy create conf object, as it gets expensive in this codepath.
+      Supplier<Configuration> callableConf = () -> new Configuration(getConfig());
       UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi();
       boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed();
 
       // enable the printing of (per daemon) LLAP task queue/run times via LLAP_TASK_TIME_SUMMARY
       ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY;
       ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY;
-      boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal)
-                             && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal);
+      boolean addTaskTimes = getConfig().getBoolean(tezSummary.varname, tezSummary.defaultBoolVal)
+                             && getConfig().getBoolean(llapTasks.varname, llapTasks.defaultBoolVal);
 
       final String llapHost;
       if (UserGroupInformation.isSecurityEnabled()) {
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index eae8e08..bf4eea0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -211,9 +211,9 @@ public class QueryTracker extends AbstractService {
         LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
       }
       if (!vertex.getIsExternalSubmission()) {
+        String[] localDirs = (ShuffleHandler.get().isDirWatcherEnabled()) ? queryInfo.getLocalDirs() : null;
         ShuffleHandler.get()
-            .registerDag(appIdString, dagIdentifier, appToken,
-                user, queryInfo.getLocalDirs());
+            .registerDag(appIdString, dagIdentifier, appToken, user, localDirs);
       }
 
       return queryInfo.registerFragment(
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 3619252..bc26dc0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -86,6 +86,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 /**
  *
@@ -93,7 +94,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class);
   private final SubmitWorkRequestProto request;
-  private final Configuration conf;
+  private final Supplier<Configuration> conf;
   private final Map<String, String> envMap;
   private final String pid = null;
   private final ObjectRegistryImpl objectRegistry;
@@ -135,8 +136,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
   @VisibleForTesting
   public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
-                            Configuration conf, ExecutionContext executionContext, Map<String, String> envMap,
-                            Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams,
+                            Supplier<Configuration> conf, ExecutionContext executionContext,
+                            Map<String, String> envMap, Credentials credentials, long memoryAvailable,
+                            AMReporter amReporter, ConfParams confParams,
                             LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
                             FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
                             TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
@@ -192,6 +194,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     setMDCFromNDC();
 
     try {
+      final Configuration config = conf.get();
       isStarted.set(true);
       this.startTime = System.currentTimeMillis();
       threadName = Thread.currentThread().getName();
@@ -254,7 +257,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
         @Override
         public LlapTaskUmbilicalProtocol run() throws Exception {
           return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
-              LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory);
+              LlapTaskUmbilicalProtocol.versionID, address, taskOwner, config, socketFactory);
         }
       });
 
@@ -277,7 +280,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
       try {
         synchronized (this) {
           if (shouldRunTask) {
-            taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(),
+            taskRunner = new TezTaskRunner2(config, fsTaskUgi, fragmentInfo.getLocalDirs(),
                 taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(),
                 serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
                 objectRegistry, pid, executionContext, memoryAvailable, false, tezHadoopShim);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index aff2c2e..9294fb3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -431,6 +431,10 @@ public class ShuffleHandler implements AttemptRegistrationListener {
     return port;
   }
 
+  public boolean isDirWatcherEnabled() {
+    return dirWatcher != null;
+  }
+
   /**
    * Register an application and it's associated credentials and user information.
    *
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 50dec47..af3f292 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -212,7 +213,7 @@ public class TaskExecutorTestHelpers {
     public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo,
                        boolean canFinish, boolean canFinishQueue, long workTime,
                        TezEvent initialEvent, boolean isGuaranteed) {
-      super(requestProto, fragmentInfo, new Configuration(), new ExecutionContextImpl("localhost"),
+      super(requestProto, fragmentInfo, Configuration::new, new ExecutionContextImpl("localhost"),
           null, new Credentials(), 0, mock(AMReporter.class), null, mock(
           LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock(
           FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
index 8ae00b9..93ca9f2 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
@@ -174,7 +174,9 @@ public class TestContainerRunnerImpl {
     containerRunner.submitWork(sRequest);
     Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
     Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
-    Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
-    Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
+    if (ShuffleHandler.get().isDirWatcherEnabled()) {
+      Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
+      Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
+    }
   }
 }