You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:30 UTC

[28/50] [abbrv] tez git commit: TEZ-2090. Add tests for jobs running in external services. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/8b40191d/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
new file mode 100644
index 0000000..a93c1a4
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.tests;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
+import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
+import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
+import org.apache.tez.examples.HashJoinExample;
+import org.apache.tez.examples.JoinDataGen;
+import org.apache.tez.examples.JoinValidate;
+import org.apache.tez.service.MiniTezTestServiceCluster;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestExternalTezServices {
+
+  private static final Log LOG = LogFactory.getLog(TestExternalTezServices.class);
+
+  private static MiniTezCluster tezCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static MiniTezTestServiceCluster tezTestServiceCluster;
+
+  private static Configuration clusterConf = new Configuration();
+  private static Configuration confForJobs;
+
+  private static FileSystem remoteFs;
+  private static FileSystem localFs;
+
+  private static TezClient sharedTezClient;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName()
+      + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException, TezException, InterruptedException {
+
+    localFs = FileSystem.getLocal(clusterConf);
+
+    try {
+      clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster =
+          new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+      LOG.info("MiniDFSCluster started");
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1);
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    tezCluster.init(conf);
+    tezCluster.start();
+    LOG.info("MiniTezCluster started");
+
+    clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    for (Map.Entry<String, String> entry : tezCluster.getConfig()) {
+      clusterConf.set(entry.getKey(), entry.getValue());
+    }
+    long jvmMax = Runtime.getRuntime().maxMemory();
+
+    tezTestServiceCluster = MiniTezTestServiceCluster
+        .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1);
+    tezTestServiceCluster.init(clusterConf);
+    tezTestServiceCluster.start();
+    LOG.info("MiniTezTestServer started");
+
+    confForJobs = new Configuration(clusterConf);
+    for (Map.Entry<String, String> entry : tezTestServiceCluster
+        .getClusterSpecificConfiguration()) {
+      confForJobs.set(entry.getKey(), entry.getValue());
+    }
+
+    // TODO TEZ-2003 Once per vertex configuration is possible, run separate tests for push vs pull (regular threaded execution)
+
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+    // This is currently configured to push tasks into the Service, and then use the standard RPC
+    confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS,
+        TezTestServiceTaskSchedulerService.class.getName());
+    confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS,
+        TezTestServiceNoOpContainerLauncher.class.getName());
+    confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS,
+        TezTestServiceTaskCommunicatorImpl.class.getName());
+
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+
+    sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
+        tezConf, true);
+    sharedTezClient.start();
+    LOG.info("Shared TezSession started");
+    sharedTezClient.waitTillReady();
+    LOG.info("Shared TezSession ready for submission");
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException, TezException {
+    if (sharedTezClient != null) {
+      sharedTezClient.stop();
+      sharedTezClient = null;
+    }
+
+    if (tezTestServiceCluster != null) {
+      tezTestServiceCluster.stop();
+      tezTestServiceCluster = null;
+    }
+
+    if (tezCluster != null) {
+      tezCluster.stop();
+      tezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+    // TODO Add cleanup code.
+  }
+
+
+  @Test(timeout = 60000)
+  public void test1() throws Exception {
+    Path testDir = new Path("/tmp/testHashJoinExample");
+
+    remoteFs.mkdirs(testDir);
+
+    Path dataPath1 = new Path(testDir, "inPath1");
+    Path dataPath2 = new Path(testDir, "inPath2");
+    Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+    Path outPath = new Path(testDir, "outPath");
+
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+
+    JoinDataGen dataGen = new JoinDataGen();
+    String[] dataGenArgs = new String[]{
+        dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+        expectedOutputPath.toString(), "2"};
+    assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
+
+    HashJoinExample joinExample = new HashJoinExample();
+    String[] args = new String[]{
+        dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
+    assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+
+    JoinValidate joinValidate = new JoinValidate();
+    String[] validateArgs = new String[]{
+        expectedOutputPath.toString(), outPath.toString(), "3"};
+    assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));
+
+    // Ensure this was actually submitted to the external cluster
+    assertTrue(tezTestServiceCluster.getNumSubmissions() > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b40191d/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
new file mode 100644
index 0000000..60ebc53
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
+
+public class ProtoConverters {
+
+  public static TaskSpec getTaskSpecfromProto(TaskSpecProto taskSpecProto) {
+    TezTaskAttemptID taskAttemptID =
+        TezTaskAttemptID.fromString(taskSpecProto.getTaskAttemptIdString());
+
+    ProcessorDescriptor processorDescriptor = null;
+    if (taskSpecProto.hasProcessorDescriptor()) {
+      processorDescriptor = DagTypeConverters
+          .convertProcessorDescriptorFromDAGPlan(taskSpecProto.getProcessorDescriptor());
+    }
+
+    List<InputSpec> inputSpecList = new ArrayList<InputSpec>(taskSpecProto.getInputSpecsCount());
+    if (taskSpecProto.getInputSpecsCount() > 0) {
+      for (IOSpecProto inputSpecProto : taskSpecProto.getInputSpecsList()) {
+        inputSpecList.add(getInputSpecFromProto(inputSpecProto));
+      }
+    }
+
+    List<OutputSpec> outputSpecList =
+        new ArrayList<OutputSpec>(taskSpecProto.getOutputSpecsCount());
+    if (taskSpecProto.getOutputSpecsCount() > 0) {
+      for (IOSpecProto outputSpecProto : taskSpecProto.getOutputSpecsList()) {
+        outputSpecList.add(getOutputSpecFromProto(outputSpecProto));
+      }
+    }
+
+    List<GroupInputSpec> groupInputSpecs =
+        new ArrayList<GroupInputSpec>(taskSpecProto.getGroupedInputSpecsCount());
+    if (taskSpecProto.getGroupedInputSpecsCount() > 0) {
+      for (GroupInputSpecProto groupInputSpecProto : taskSpecProto.getGroupedInputSpecsList()) {
+        groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto));
+      }
+    }
+
+    TaskSpec taskSpec =
+        new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(),
+            taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
+            outputSpecList, groupInputSpecs);
+    return taskSpec;
+  }
+
+  public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
+    TaskSpecProto.Builder builder = TaskSpecProto.newBuilder();
+    builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
+    builder.setDagName(taskSpec.getDAGName());
+    builder.setVertexName(taskSpec.getVertexName());
+    builder.setVertexParallelism(taskSpec.getVertexParallelism());
+
+    if (taskSpec.getProcessorDescriptor() != null) {
+      builder.setProcessorDescriptor(
+          DagTypeConverters.convertToDAGPlan(taskSpec.getProcessorDescriptor()));
+    }
+
+    if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) {
+      for (InputSpec inputSpec : taskSpec.getInputs()) {
+        builder.addInputSpecs(convertInputSpecToProto(inputSpec));
+      }
+    }
+
+    if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) {
+      for (OutputSpec outputSpec : taskSpec.getOutputs()) {
+        builder.addOutputSpecs(convertOutputSpecToProto(outputSpec));
+      }
+    }
+
+    if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) {
+      for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) {
+        builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec));
+
+      }
+    }
+    return builder.build();
+  }
+
+
+  public static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) {
+    InputDescriptor inputDescriptor = null;
+    if (inputSpecProto.hasIoDescriptor()) {
+      inputDescriptor =
+          DagTypeConverters.convertInputDescriptorFromDAGPlan(inputSpecProto.getIoDescriptor());
+    }
+    InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor,
+        inputSpecProto.getPhysicalEdgeCount());
+    return inputSpec;
+  }
+
+  public static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) {
+    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+    if (inputSpec.getSourceVertexName() != null) {
+      builder.setConnectedVertexName(inputSpec.getSourceVertexName());
+    }
+    if (inputSpec.getInputDescriptor() != null) {
+      builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(inputSpec.getInputDescriptor()));
+    }
+    builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount());
+    return builder.build();
+  }
+
+  public static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) {
+    OutputDescriptor outputDescriptor = null;
+    if (outputSpecProto.hasIoDescriptor()) {
+      outputDescriptor =
+          DagTypeConverters.convertOutputDescriptorFromDAGPlan(outputSpecProto.getIoDescriptor());
+    }
+    OutputSpec outputSpec =
+        new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor,
+            outputSpecProto.getPhysicalEdgeCount());
+    return outputSpec;
+  }
+
+  public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) {
+    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+    if (outputSpec.getDestinationVertexName() != null) {
+      builder.setConnectedVertexName(outputSpec.getDestinationVertexName());
+    }
+    if (outputSpec.getOutputDescriptor() != null) {
+      builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(outputSpec.getOutputDescriptor()));
+    }
+    builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount());
+    return builder.build();
+  }
+
+  public static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) {
+    GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(),
+        groupInputSpecProto.getGroupVerticesList(), DagTypeConverters
+        .convertInputDescriptorFromDAGPlan(groupInputSpecProto.getMergedInputDescriptor()));
+    return groupSpec;
+  }
+
+  public static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) {
+    GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder();
+    builder.setGroupName(groupInputSpec.getGroupName());
+    builder.addAllGroupVertices(groupInputSpec.getGroupVertices());
+    builder.setMergedInputDescriptor(
+        DagTypeConverters.convertToDAGPlan(groupInputSpec.getMergedInputDescriptor()));
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b40191d/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
new file mode 100644
index 0000000..2f8b2e6
--- /dev/null
+++ b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.test.service.rpc";
+option java_outer_classname = "TezTestServiceProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "DAGApiRecords.proto";
+
+message IOSpecProto {
+  optional string connected_vertex_name = 1;
+  optional TezEntityDescriptorProto io_descriptor = 2;
+  optional int32 physical_edge_count = 3;
+}
+
+message GroupInputSpecProto {
+  optional string group_name = 1;
+  repeated string group_vertices = 2;
+  optional TezEntityDescriptorProto merged_input_descriptor = 3;
+}
+
+message TaskSpecProto {
+  optional string task_attempt_id_string = 1;
+  optional string dag_name = 2;
+  optional string vertex_name = 3;
+  optional TezEntityDescriptorProto processor_descriptor = 4;
+  repeated IOSpecProto input_specs = 5;
+  repeated IOSpecProto output_specs = 6;
+  repeated GroupInputSpecProto grouped_input_specs = 7;
+  optional int32 vertex_parallelism = 8;
+}
+
+
+message SubmitWorkRequestProto {
+  optional string container_id_string = 1;
+  optional string am_host = 2;
+  optional int32 am_port = 3;
+  optional string token_identifier = 4;
+  optional bytes credentials_binary = 5;
+  optional string user = 6;
+  optional string application_id_string = 7;
+  optional int32 app_attempt_number = 8;
+  optional TaskSpecProto task_spec = 9;
+}
+
+message SubmitWorkResponseProto {
+}
+
+
+
+message RunContainerRequestProto {
+  optional string container_id_string = 1;
+  optional string am_host = 2;
+  optional int32 am_port = 3;
+  optional string token_identifier = 4;
+  optional bytes credentials_binary = 5;
+  optional string user = 6;
+  optional string application_id_string = 7;
+  optional int32 app_attempt_number = 8;
+}
+
+message RunContainerResponseProto {
+}
+
+service TezTestServiceProtocol {
+  rpc runContainer(RunContainerRequestProto) returns (RunContainerResponseProto);
+  rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b40191d/tez-ext-service-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/resources/log4j.properties b/tez-ext-service-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..531b68b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/tez/blob/8b40191d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index fd55992..3cba3ce 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -389,7 +389,7 @@ public class TezChild {
     private final Throwable throwable;
     private final String errorMessage;
 
-    ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
+    public ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
                              @Nullable String errorMessage) {
       this.exitStatus = exitStatus;
       this.throwable = throwable;

http://git-wip-us.apache.org/repos/asf/tez/blob/8b40191d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index de83889..f54814b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -67,7 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   private final AtomicBoolean taskRunning;
   private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
 
-  TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+  public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
       TaskSpec taskSpec, int appAttemptNumber,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
       Multimap<String, String> startedInputsMap, TaskReporter taskReporter,