You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/01/25 00:36:58 UTC

[6/8] hive git commit: HIVE-12692. Make use of the Tez HadoopShim in TaskRunner usage. (Siddharth Seth, reviewed by Sergey Shelukhin) (cherry picked from commit 5a9ccddb08f767466decb1c62054764e9fc4e0fc)

HIVE-12692. Make use of the Tez HadoopShim in TaskRunner usage. (Siddharth Seth, reviewed by Sergey Shelukhin)
(cherry picked from commit 5a9ccddb08f767466decb1c62054764e9fc4e0fc)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1eb55a80
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1eb55a80
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1eb55a80

Branch: refs/heads/branch-2.0
Commit: 1eb55a8013b6e0e69b3a928130676ae7e931d9e3
Parents: 94e8761
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Jan 24 15:24:11 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Jan 24 15:30:10 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java    | 6 +++++-
 .../hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java     | 8 ++++++--
 .../hive/llap/daemon/impl/TaskExecutorTestHelpers.java       | 3 ++-
 .../impl/comparator/TestFirstInFirstOutComparator.java       | 3 ++-
 4 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1eb55a80/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index b0bf844..a2a55cc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -60,6 +60,8 @@ import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,6 +87,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   private final Configuration conf;
   private final TaskRunnerCallable.ConfParams confParams;
   private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl();
+  private final HadoopShim tezHadoopShim;
 
   public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
       boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort,
@@ -122,6 +125,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
         conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
             TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT)
     );
+    tezHadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
 
     LOG.info("ContainerRunnerImpl config: " +
             "memoryPerExecutorDerviced=" + memoryPerExecutor
@@ -207,7 +211,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
           new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
-          this);
+          this, tezHadoopShim);
       submissionState = executorService.schedule(callable);
 
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1eb55a80/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index b60f71f..ede2a03 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -51,6 +51,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
@@ -100,6 +101,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final LlapDaemonExecutorMetrics metrics;
   private final String requestId;
   private final String queryId;
+  private final HadoopShim tezHadoopShim;
   private boolean shouldRunTask = true;
   final Stopwatch runtimeWatch = new Stopwatch();
   final Stopwatch killtimerWatch = new Stopwatch();
@@ -115,7 +117,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
                      long memoryAvailable, AMReporter amReporter,
                      ConfParams confParams, LlapDaemonExecutorMetrics metrics,
                      KilledTaskHandler killedTaskHandler,
-                     FragmentCompletionHandler fragmentCompleteHandler) {
+                     FragmentCompletionHandler fragmentCompleteHandler,
+                     HadoopShim tezHadoopShim) {
     this.request = request;
     this.fragmentInfo = fragmentInfo;
     this.conf = conf;
@@ -139,6 +142,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     this.queryId = request.getFragmentSpec().getDagName();
     this.killedTaskHandler = killedTaskHandler;
     this.fragmentCompletionHanler = fragmentCompleteHandler;
+    this.tezHadoopShim = tezHadoopShim;
   }
 
   public long getStartTime() {
@@ -216,7 +220,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
               serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
               objectRegistry,
               pid,
-              executionContext, memoryAvailable, false);
+              executionContext, memoryAvailable, false, tezHadoopShim);
         }
       }
       if (taskRunner == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1eb55a80/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index ef49714..4d05c35 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -35,6 +35,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
@@ -145,7 +146,7 @@ public class TaskExecutorTestHelpers {
           new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock(
               LlapDaemonExecutorMetrics.class),
           mock(KilledTaskHandler.class), mock(
-              FragmentCompletionHandler.class));
+              FragmentCompletionHandler.class), new DefaultHadoopShim());
       this.workTime = workTime;
       this.canFinish = canFinish;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/1eb55a80/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index ebfb430..73df985 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
@@ -58,7 +59,7 @@ public class TestFirstInFirstOutComparator {
       super(requestProto, mock(QueryFragmentInfo.class), conf,
           new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
           mock(KilledTaskHandler.class), mock(
-          FragmentCompletionHandler.class));
+          FragmentCompletionHandler.class), new DefaultHadoopShim());
       this.workTime = workTime;
       this.canFinish = canFinish;
     }