You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/14 02:24:34 UTC

[1/3] tez git commit: TEZ-2090. Add tests for jobs running in external services. (sseth)

Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 4f560543d -> 4e8ee895e


http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
new file mode 100644
index 0000000..a93c1a4
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.tests;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
+import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
+import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
+import org.apache.tez.examples.HashJoinExample;
+import org.apache.tez.examples.JoinDataGen;
+import org.apache.tez.examples.JoinValidate;
+import org.apache.tez.service.MiniTezTestServiceCluster;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestExternalTezServices {
+
+  private static final Log LOG = LogFactory.getLog(TestExternalTezServices.class);
+
+  private static MiniTezCluster tezCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static MiniTezTestServiceCluster tezTestServiceCluster;
+
+  private static Configuration clusterConf = new Configuration();
+  private static Configuration confForJobs;
+
+  private static FileSystem remoteFs;
+  private static FileSystem localFs;
+
+  private static TezClient sharedTezClient;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName()
+      + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException, TezException, InterruptedException {
+
+    localFs = FileSystem.getLocal(clusterConf);
+
+    try {
+      clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster =
+          new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+      LOG.info("MiniDFSCluster started");
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1);
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    tezCluster.init(conf);
+    tezCluster.start();
+    LOG.info("MiniTezCluster started");
+
+    clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    for (Map.Entry<String, String> entry : tezCluster.getConfig()) {
+      clusterConf.set(entry.getKey(), entry.getValue());
+    }
+    long jvmMax = Runtime.getRuntime().maxMemory();
+
+    tezTestServiceCluster = MiniTezTestServiceCluster
+        .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1);
+    tezTestServiceCluster.init(clusterConf);
+    tezTestServiceCluster.start();
+    LOG.info("MiniTezTestServer started");
+
+    confForJobs = new Configuration(clusterConf);
+    for (Map.Entry<String, String> entry : tezTestServiceCluster
+        .getClusterSpecificConfiguration()) {
+      confForJobs.set(entry.getKey(), entry.getValue());
+    }
+
+    // TODO TEZ-2003 Once per vertex configuration is possible, run separate tests for push vs pull (regular threaded execution)
+
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+    // This is currently configured to push tasks into the Service, and then use the standard RPC
+    confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS,
+        TezTestServiceTaskSchedulerService.class.getName());
+    confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS,
+        TezTestServiceNoOpContainerLauncher.class.getName());
+    confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS,
+        TezTestServiceTaskCommunicatorImpl.class.getName());
+
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+
+    sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
+        tezConf, true);
+    sharedTezClient.start();
+    LOG.info("Shared TezSession started");
+    sharedTezClient.waitTillReady();
+    LOG.info("Shared TezSession ready for submission");
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException, TezException {
+    if (sharedTezClient != null) {
+      sharedTezClient.stop();
+      sharedTezClient = null;
+    }
+
+    if (tezTestServiceCluster != null) {
+      tezTestServiceCluster.stop();
+      tezTestServiceCluster = null;
+    }
+
+    if (tezCluster != null) {
+      tezCluster.stop();
+      tezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+    // TODO Add cleanup code.
+  }
+
+
+  @Test(timeout = 60000)
+  public void test1() throws Exception {
+    Path testDir = new Path("/tmp/testHashJoinExample");
+
+    remoteFs.mkdirs(testDir);
+
+    Path dataPath1 = new Path(testDir, "inPath1");
+    Path dataPath2 = new Path(testDir, "inPath2");
+    Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+    Path outPath = new Path(testDir, "outPath");
+
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+
+    JoinDataGen dataGen = new JoinDataGen();
+    String[] dataGenArgs = new String[]{
+        dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+        expectedOutputPath.toString(), "2"};
+    assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
+
+    HashJoinExample joinExample = new HashJoinExample();
+    String[] args = new String[]{
+        dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
+    assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+
+    JoinValidate joinValidate = new JoinValidate();
+    String[] validateArgs = new String[]{
+        expectedOutputPath.toString(), outPath.toString(), "3"};
+    assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));
+
+    // Ensure this was actually submitted to the external cluster
+    assertTrue(tezTestServiceCluster.getNumSubmissions() > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
new file mode 100644
index 0000000..60ebc53
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
+
+public class ProtoConverters {
+
+  public static TaskSpec getTaskSpecfromProto(TaskSpecProto taskSpecProto) {
+    TezTaskAttemptID taskAttemptID =
+        TezTaskAttemptID.fromString(taskSpecProto.getTaskAttemptIdString());
+
+    ProcessorDescriptor processorDescriptor = null;
+    if (taskSpecProto.hasProcessorDescriptor()) {
+      processorDescriptor = DagTypeConverters
+          .convertProcessorDescriptorFromDAGPlan(taskSpecProto.getProcessorDescriptor());
+    }
+
+    List<InputSpec> inputSpecList = new ArrayList<InputSpec>(taskSpecProto.getInputSpecsCount());
+    if (taskSpecProto.getInputSpecsCount() > 0) {
+      for (IOSpecProto inputSpecProto : taskSpecProto.getInputSpecsList()) {
+        inputSpecList.add(getInputSpecFromProto(inputSpecProto));
+      }
+    }
+
+    List<OutputSpec> outputSpecList =
+        new ArrayList<OutputSpec>(taskSpecProto.getOutputSpecsCount());
+    if (taskSpecProto.getOutputSpecsCount() > 0) {
+      for (IOSpecProto outputSpecProto : taskSpecProto.getOutputSpecsList()) {
+        outputSpecList.add(getOutputSpecFromProto(outputSpecProto));
+      }
+    }
+
+    List<GroupInputSpec> groupInputSpecs =
+        new ArrayList<GroupInputSpec>(taskSpecProto.getGroupedInputSpecsCount());
+    if (taskSpecProto.getGroupedInputSpecsCount() > 0) {
+      for (GroupInputSpecProto groupInputSpecProto : taskSpecProto.getGroupedInputSpecsList()) {
+        groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto));
+      }
+    }
+
+    TaskSpec taskSpec =
+        new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(),
+            taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
+            outputSpecList, groupInputSpecs);
+    return taskSpec;
+  }
+
+  public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
+    TaskSpecProto.Builder builder = TaskSpecProto.newBuilder();
+    builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
+    builder.setDagName(taskSpec.getDAGName());
+    builder.setVertexName(taskSpec.getVertexName());
+    builder.setVertexParallelism(taskSpec.getVertexParallelism());
+
+    if (taskSpec.getProcessorDescriptor() != null) {
+      builder.setProcessorDescriptor(
+          DagTypeConverters.convertToDAGPlan(taskSpec.getProcessorDescriptor()));
+    }
+
+    if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) {
+      for (InputSpec inputSpec : taskSpec.getInputs()) {
+        builder.addInputSpecs(convertInputSpecToProto(inputSpec));
+      }
+    }
+
+    if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) {
+      for (OutputSpec outputSpec : taskSpec.getOutputs()) {
+        builder.addOutputSpecs(convertOutputSpecToProto(outputSpec));
+      }
+    }
+
+    if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) {
+      for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) {
+        builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec));
+
+      }
+    }
+    return builder.build();
+  }
+
+
+  public static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) {
+    InputDescriptor inputDescriptor = null;
+    if (inputSpecProto.hasIoDescriptor()) {
+      inputDescriptor =
+          DagTypeConverters.convertInputDescriptorFromDAGPlan(inputSpecProto.getIoDescriptor());
+    }
+    InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor,
+        inputSpecProto.getPhysicalEdgeCount());
+    return inputSpec;
+  }
+
+  public static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) {
+    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+    if (inputSpec.getSourceVertexName() != null) {
+      builder.setConnectedVertexName(inputSpec.getSourceVertexName());
+    }
+    if (inputSpec.getInputDescriptor() != null) {
+      builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(inputSpec.getInputDescriptor()));
+    }
+    builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount());
+    return builder.build();
+  }
+
+  public static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) {
+    OutputDescriptor outputDescriptor = null;
+    if (outputSpecProto.hasIoDescriptor()) {
+      outputDescriptor =
+          DagTypeConverters.convertOutputDescriptorFromDAGPlan(outputSpecProto.getIoDescriptor());
+    }
+    OutputSpec outputSpec =
+        new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor,
+            outputSpecProto.getPhysicalEdgeCount());
+    return outputSpec;
+  }
+
+  public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) {
+    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+    if (outputSpec.getDestinationVertexName() != null) {
+      builder.setConnectedVertexName(outputSpec.getDestinationVertexName());
+    }
+    if (outputSpec.getOutputDescriptor() != null) {
+      builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(outputSpec.getOutputDescriptor()));
+    }
+    builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount());
+    return builder.build();
+  }
+
+  public static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) {
+    GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(),
+        groupInputSpecProto.getGroupVerticesList(), DagTypeConverters
+        .convertInputDescriptorFromDAGPlan(groupInputSpecProto.getMergedInputDescriptor()));
+    return groupSpec;
+  }
+
+  public static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) {
+    GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder();
+    builder.setGroupName(groupInputSpec.getGroupName());
+    builder.addAllGroupVertices(groupInputSpec.getGroupVertices());
+    builder.setMergedInputDescriptor(
+        DagTypeConverters.convertToDAGPlan(groupInputSpec.getMergedInputDescriptor()));
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
new file mode 100644
index 0000000..2f8b2e6
--- /dev/null
+++ b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.test.service.rpc";
+option java_outer_classname = "TezTestServiceProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "DAGApiRecords.proto";
+
+message IOSpecProto {
+  optional string connected_vertex_name = 1;
+  optional TezEntityDescriptorProto io_descriptor = 2;
+  optional int32 physical_edge_count = 3;
+}
+
+message GroupInputSpecProto {
+  optional string group_name = 1;
+  repeated string group_vertices = 2;
+  optional TezEntityDescriptorProto merged_input_descriptor = 3;
+}
+
+message TaskSpecProto {
+  optional string task_attempt_id_string = 1;
+  optional string dag_name = 2;
+  optional string vertex_name = 3;
+  optional TezEntityDescriptorProto processor_descriptor = 4;
+  repeated IOSpecProto input_specs = 5;
+  repeated IOSpecProto output_specs = 6;
+  repeated GroupInputSpecProto grouped_input_specs = 7;
+  optional int32 vertex_parallelism = 8;
+}
+
+
+message SubmitWorkRequestProto {
+  optional string container_id_string = 1;
+  optional string am_host = 2;
+  optional int32 am_port = 3;
+  optional string token_identifier = 4;
+  optional bytes credentials_binary = 5;
+  optional string user = 6;
+  optional string application_id_string = 7;
+  optional int32 app_attempt_number = 8;
+  optional TaskSpecProto task_spec = 9;
+}
+
+message SubmitWorkResponseProto {
+}
+
+
+
+message RunContainerRequestProto {
+  optional string container_id_string = 1;
+  optional string am_host = 2;
+  optional int32 am_port = 3;
+  optional string token_identifier = 4;
+  optional bytes credentials_binary = 5;
+  optional string user = 6;
+  optional string application_id_string = 7;
+  optional int32 app_attempt_number = 8;
+}
+
+message RunContainerResponseProto {
+}
+
+service TezTestServiceProtocol {
+  rpc runContainer(RunContainerRequestProto) returns (RunContainerResponseProto);
+  rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/resources/log4j.properties b/tez-ext-service-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..531b68b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 6164e52..30f80a5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -390,7 +390,7 @@ public class TezChild {
     private final Throwable throwable;
     private final String errorMessage;
 
-    ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
+    public ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
                              @Nullable String errorMessage) {
       this.exitStatus = exitStatus;
       this.throwable = throwable;

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 6606481..34f2fc6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -67,7 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   private final AtomicBoolean taskRunning;
   private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
 
-  TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+  public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
       TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
       Multimap<String, String> startedInputsMap, TaskReporter taskReporter,


[3/3] tez git commit: TEZ-2090. Add tests for jobs running in external services. (sseth)

Posted by ss...@apache.org.
TEZ-2090. Add tests for jobs running in external services. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4e8ee895
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4e8ee895
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4e8ee895

Branch: refs/heads/TEZ-2003
Commit: 4e8ee895ec8f6b22b95dda662fe7f2579eaf0094
Parents: 4f56054
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 13 17:24:05 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Feb 13 17:24:05 2015 -0800

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 pom.xml                                         |   6 +
 .../apache/tez/dag/api/TezConfiguration.java    |   2 +
 .../apache/tez/dag/api/TaskCommunicator.java    |   1 +
 .../tez/dag/api/TaskCommunicatorContext.java    |   3 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  39 +-
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  42 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   2 +-
 tez-ext-service-tests/pom.xml                   | 161 ++++
 .../tez/dag/app/TezTestServiceCommunicator.java | 152 ++++
 .../TezTestServiceContainerLauncher.java        | 144 ++++
 .../TezTestServiceNoOpContainerLauncher.java    |  66 ++
 .../rm/TezTestServiceTaskSchedulerService.java  | 347 ++++++++
 .../TezTestServiceTaskCommunicatorImpl.java     | 182 ++++
 .../org/apache/tez/service/ContainerRunner.java |  27 +
 .../tez/service/MiniTezTestServiceCluster.java  | 163 ++++
 .../service/TezTestServiceConfConstants.java    |  41 +
 .../TezTestServiceProtocolBlockingPB.java       |  22 +
 .../tez/service/impl/ContainerRunnerImpl.java   | 512 +++++++++++
 .../apache/tez/service/impl/TezTestService.java | 126 +++
 .../impl/TezTestServiceProtocolClientImpl.java  |  82 ++
 .../impl/TezTestServiceProtocolServerImpl.java  | 133 +++
 .../tez/shufflehandler/FadvisedChunkedFile.java |  78 ++
 .../tez/shufflehandler/FadvisedFileRegion.java  | 160 ++++
 .../apache/tez/shufflehandler/IndexCache.java   | 199 +++++
 .../tez/shufflehandler/ShuffleHandler.java      | 840 +++++++++++++++++++
 .../tez/tests/TestExternalTezServices.java      | 183 ++++
 .../org/apache/tez/util/ProtoConverters.java    | 172 ++++
 .../src/test/proto/TezDaemonProtocol.proto      |  84 ++
 .../src/test/resources/log4j.properties         |  19 +
 .../org/apache/tez/runtime/task/TezChild.java   |   2 +-
 .../apache/tez/runtime/task/TezTaskRunner.java  |   2 +-
 32 files changed, 3980 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d7e4be5..975ce65 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -1,5 +1,6 @@
 ALL CHANGES:
   TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
   TEZ-2006. Task communication plane needs to be pluggable.
+  TEZ-2090. Add tests for jobs running in external services.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3396587..eb6bcad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,6 +158,11 @@
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-ext-service-tests</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>log4j</groupId>
         <artifactId>log4j</artifactId>
         <version>1.2.17</version>
@@ -620,6 +625,7 @@
     <module>tez-dag</module>
     <module>tez-ui</module>
     <module>tez-plugins</module>
+    <module>tez-ext-service-tests</module>
     <module>tez-dist</module>
     <module>docs</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index df233e1..3414dbd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1140,6 +1140,8 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
   @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+  @ConfigurationScope(Scope.VERTEX)
+  public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class";
 
 
   // TODO only validate property here, value can also be validated if necessary

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 97f9c16..c9f85e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.api;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 9b2d889..41675fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -44,5 +44,8 @@ public interface TaskCommunicatorContext {
   // TODO TEZ-2003 Move to vertex, taskIndex, version
   void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
 
+  // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure.
+  // This will have to take into consideration the TA_FAILED event
+
   // TODO Eventually Add methods to report availability stats to the scheduler.
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index f7dfe19..08b50ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -18,6 +18,8 @@
 package org.apache.tez.dag.app;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,13 +28,16 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -42,6 +47,7 @@ import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.rm.TaskSchedulerService;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -58,7 +64,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       .getLog(TaskAttemptListenerImpTezDag.class);
 
   private final AppContext context;
-  private final TaskCommunicator taskCommunicator;
+  private TaskCommunicator taskCommunicator;
 
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -93,6 +99,32 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
+  public void serviceInit(Configuration conf) {
+    String taskCommClassName = conf.get(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS);
+    if (taskCommClassName == null) {
+      LOG.info("Using Default Task Communicator");
+      this.taskCommunicator = new TezTaskCommunicatorImpl(this);
+    } else {
+      LOG.info("Using TaskCommunicator: " + taskCommClassName);
+      Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
+          .getClazz(taskCommClassName);
+      try {
+        Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
+        ctor.setAccessible(true);
+        this.taskCommunicator = ctor.newInstance(this);
+      } catch (NoSuchMethodException e) {
+        throw new TezUncheckedException(e);
+      } catch (InvocationTargetException e) {
+        throw new TezUncheckedException(e);
+      } catch (InstantiationException e) {
+        throw new TezUncheckedException(e);
+      } catch (IllegalAccessException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+  }
+
+  @Override
   public void serviceStart() {
     taskCommunicator.init(getConfig());
     taskCommunicator.start();
@@ -100,7 +132,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   @Override
   public void serviceStop() {
-    taskCommunicator.stop();
+    if (taskCommunicator != null) {
+      taskCommunicator.stop();
+      taskCommunicator = null;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 5652937..258c927 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -74,16 +74,22 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
       new ConcurrentHashMap<TaskAttempt, ContainerId>();
 
   private final TezTaskUmbilicalProtocol taskUmbilical;
+  private final String tokenIdentifier;
+  private final Token<JobTokenIdentifier> sessionToken;
   private InetSocketAddress address;
   private Server server;
 
-  private static final class ContainerInfo {
+  public static final class ContainerInfo {
 
-    ContainerInfo(ContainerId containerId) {
+    ContainerInfo(ContainerId containerId, String host, int port) {
       this.containerId = containerId;
+      this.host = host;
+      this.port = port;
     }
 
-    ContainerId containerId;
+    final ContainerId containerId;
+    public final String host;
+    public final int port;
     TezHeartbeatResponse lastResponse = null;
     TaskSpec taskSpec = null;
     long lastRequestId = 0;
@@ -110,6 +116,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     super(TezTaskCommunicatorImpl.class.getName());
     this.taskCommunicatorContext = taskCommunicatorContext;
     this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
+    this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
+    this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
   }
 
 
@@ -130,9 +138,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
       try {
         JobTokenSecretManager jobTokenSecretManager =
             new JobTokenSecretManager();
-        Token<JobTokenIdentifier> sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
-        jobTokenSecretManager.addTokenForJob(
-            taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(), sessionToken);
+        jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
 
         server = new RPC.Builder(conf)
             .setProtocol(TezTaskUmbilicalProtocol.class)
@@ -182,7 +188,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
   @Override
   public void registerRunningContainer(ContainerId containerId, String host, int port) {
-    ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId));
+    ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port));
     if (oldInfo != null) {
       throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
     }
@@ -230,9 +236,9 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
                 ". Already registered to containerId: " + oldId);
       }
     }
-
   }
 
+
   @Override
   public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
     TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
@@ -258,6 +264,18 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     return address;
   }
 
+  protected String getTokenIdentifier() {
+    return tokenIdentifier;
+  }
+
+  protected Token<JobTokenIdentifier> getSessionToken() {
+    return sessionToken;
+  }
+
+  protected TaskCommunicatorContext getTaskCommunicatorContext() {
+    return taskCommunicatorContext;
+  }
+
   public TezTaskUmbilicalProtocol getUmbilical() {
     return this.taskUmbilical;
   }
@@ -471,4 +489,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
       return "TaskAttempt{" + "taskAttemptId=" + taskAttemptId + '}';
     }
   }
+
+  protected ContainerInfo getContainerInfo(ContainerId containerId) {
+    return registeredContainers.get(containerId);
+  }
+
+  protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) {
+    return attemptToContainerMap.get(new TaskAttempt(taskAttemptId));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 72dcd95..228871f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -350,7 +350,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         try {
           Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
               .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
-                  Integer.class, String.class, Configuration.class);
+                  int.class, String.class, Configuration.class);
           ctor.setAccessible(true);
           TaskSchedulerService taskSchedulerService =
               ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
new file mode 100644
index 0000000..37f68b1
--- /dev/null
+++ b/tez-ext-service-tests/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>tez</artifactId>
+    <groupId>org.apache.tez</groupId>
+    <version>0.7.0-SNAPSHOT</version>
+  </parent>
+
+  <!-- TODO TEZ-2003 Merge this into the tez-tests module -->
+  <artifactId>tez-ext-service-tests</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+    </dependency>
+    <dependency>
+      <!-- Required for the ShuffleHandler -->
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <!--
+     Include all files in src/main/resources.  By default, do not apply property
+     substitution (filtering=false), but do apply property substitution to
+     version-info.properties (filtering=true).  This will substitute the
+     version information correctly, but prevent Maven from altering other files.
+     -->
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <excludes>
+          <exclude>tez-api-version-info.properties</exclude>
+        </excludes>
+        <filtering>false</filtering>
+      </resource>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <includes>
+          <include>tez-api-version-info.properties</include>
+        </includes>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/test/proto</param>
+                <param>${basedir}/../tez-api/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/test/proto</directory>
+                <includes>
+                  <include>TezDaemonProtocol.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-test-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
new file mode 100644
index 0000000..ac50878
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.service.impl.TezTestServiceProtocolClientImpl;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+
+public class TezTestServiceCommunicator extends AbstractService {
+
+  private final ConcurrentMap<String, TezTestServiceProtocolBlockingPB> hostProxies;
+  private final ListeningExecutorService executor;
+
+  // TODO Convert this into a singleton
+  public TezTestServiceCommunicator(int numThreads) {
+    super(TezTestServiceCommunicator.class.getSimpleName());
+    ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+        new ThreadFactoryBuilder().setNameFormat("TezTestServiceCommunicator #%2d").build());
+    this.hostProxies = new ConcurrentHashMap<String, TezTestServiceProtocolBlockingPB>();
+    executor = MoreExecutors.listeningDecorator(localExecutor);
+  }
+
+  @Override
+  public void serviceStop() {
+    executor.shutdownNow();
+  }
+
+
+  public void runContainer(RunContainerRequestProto request, String host, int port,
+                           final ExecuteRequestCallback<RunContainerResponseProto> callback) {
+    ListenableFuture<RunContainerResponseProto> future = executor.submit(new RunContainerCallable(request, host, port));
+    Futures.addCallback(future, new FutureCallback<RunContainerResponseProto>() {
+      @Override
+      public void onSuccess(RunContainerResponseProto result) {
+        callback.setResponse(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        callback.indicateError(t);
+      }
+    });
+
+  }
+
+  public void submitWork(SubmitWorkRequestProto request, String host, int port,
+                         final ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
+    ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(request, host, port));
+    Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() {
+      @Override
+      public void onSuccess(SubmitWorkResponseProto result) {
+        callback.setResponse(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        callback.indicateError(t);
+      }
+    });
+
+  }
+
+
+  private class RunContainerCallable implements Callable<RunContainerResponseProto> {
+
+    final String hostname;
+    final int port;
+    final RunContainerRequestProto request;
+
+    private RunContainerCallable(RunContainerRequestProto request, String hostname, int port) {
+      this.hostname = hostname;
+          this.port = port;
+      this.request = request;
+    }
+
+    @Override
+    public RunContainerResponseProto call() throws Exception {
+      return getProxy(hostname, port).runContainer(null, request);
+    }
+  }
+
+  private class SubmitWorkCallable implements Callable<SubmitWorkResponseProto> {
+    final String hostname;
+    final int port;
+    final SubmitWorkRequestProto request;
+
+    private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) {
+      this.hostname = hostname;
+      this.port = port;
+      this.request = request;
+    }
+
+    @Override
+    public SubmitWorkResponseProto call() throws Exception {
+      return getProxy(hostname, port).submitWork(null, request);
+    }
+  }
+
+  public interface ExecuteRequestCallback<T extends Message> {
+    void setResponse(T response);
+    void indicateError(Throwable t);
+  }
+
+  private TezTestServiceProtocolBlockingPB getProxy(String hostname, int port) {
+    String hostId = getHostIdentifier(hostname, port);
+
+    TezTestServiceProtocolBlockingPB proxy = hostProxies.get(hostId);
+    if (proxy == null) {
+      proxy = new TezTestServiceProtocolClientImpl(getConfig(), hostname, port);
+      TezTestServiceProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
+      if (proxyOld != null) {
+        // TODO Shutdown the new proxy.
+        proxy = proxyOld;
+      }
+    }
+    return proxy;
+  }
+
+  private String getHostIdentifier(String hostname, int port) {
+    return hostname + ":" + port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
new file mode 100644
index 0000000..e83165b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.service.TezTestServiceConfConstants;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+
+public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher {
+
+  // TODO Support interruptability of tasks which haven't yet been launched.
+
+  // TODO May need multiple connections per target machine, depending upon how synchronization is handled in the RPC layer
+
+  static final Log LOG = LogFactory.getLog(TezTestServiceContainerLauncher.class);
+
+  private final AppContext context;
+  private final String tokenIdentifier;
+  private final TaskAttemptListener tal;
+  private final int servicePort;
+  private final TezTestServiceCommunicator communicator;
+  private final Clock clock;
+
+
+  // Configuration passed in here to set up final parameters
+  public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf,
+                                         TaskAttemptListener tal) {
+    super(TezTestServiceContainerLauncher.class.getName());
+    this.clock = appContext.getClock();
+    int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
+
+    this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+    Preconditions.checkArgument(servicePort > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
+    this.communicator = new TezTestServiceCommunicator(numThreads);
+    this.context = appContext;
+    this.tokenIdentifier = context.getApplicationID().toString();
+    this.tal = tal;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    communicator.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    communicator.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    communicator.stop();
+  }
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    switch (event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+        RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent);
+        communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(),
+            launchEvent.getNodeId().getPort(),
+            new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
+              @Override
+              public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
+                LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId());
+                context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+                ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+                    launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+                context.getHistoryHandler().handle(new DAGHistoryEvent(
+                    null, lEvt));
+              }
+
+              @Override
+              public void indicateError(Throwable t) {
+                LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t);
+                sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t);
+              }
+            });
+        break;
+      case CONTAINER_STOP_REQUEST:
+        LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+        // that the container is actually done (normally received from RM)
+        // TODO Sending this out for an un-launched container is invalid
+        context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+            AMContainerEventType.C_NM_STOP_SENT));
+        break;
+    }
+  }
+
+  private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
+    RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
+    builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort());
+    builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId());
+    builder.setApplicationIdString(
+        event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString());
+    builder.setTokenIdentifier(tokenIdentifier);
+    builder.setContainerIdString(event.getContainer().getId().toString());
+    builder.setCredentialsBinary(
+        ByteString.copyFrom(event.getContainerLaunchContext().getTokens()));
+    // TODO Avoid reading this from the environment
+    builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    return builder.build();
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) {
+    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
new file mode 100644
index 0000000..8c8e486
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+
+public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher {
+
+  static final Log LOG = LogFactory.getLog(TezTestServiceNoOpContainerLauncher.class);
+
+  private final AppContext context;
+  private final Clock clock;
+
+  public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf,
+                                         TaskAttemptListener tal) {
+    super(TezTestServiceNoOpContainerLauncher.class.getName());
+    this.context = appContext;
+    this.clock = appContext.getClock();
+  }
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    switch(event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+        LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId());
+        context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+        ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+            launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+        context.getHistoryHandler().handle(new DAGHistoryEvent(
+            null, lEvt));
+        break;
+      case CONTAINER_STOP_REQUEST:
+        LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+        context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+            AMContainerEventType.C_NM_STOP_SENT));
+        break;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
new file mode 100644
index 0000000..e3c18bf
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.service.TezTestServiceConfConstants;
+
+
+// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes.
+
+public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
+
+  private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class);
+
+  private final ExecutorService appCallbackExecutor;
+  private final TaskSchedulerAppCallback appClientDelegate;
+  private final AppContext appContext;
+  private final List<String> serviceHosts;
+  private final ContainerFactory containerFactory;
+  private final Random random = new Random();
+  // Currently all services must be running on the same port.
+  private final int containerPort;
+
+  private final String clientHostname;
+  private final int clientPort;
+  private final String trackingUrl;
+  private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  private final ConcurrentMap<Object, ContainerId> runningTasks =
+      new ConcurrentHashMap<Object, ContainerId>();
+
+  private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
+
+  // Per instance
+  private final int memoryPerInstance;
+  private final int coresPerInstance;
+  private final int executorsPerInstance;
+
+  // Per Executor Thread
+  private final Resource resourcePerContainer;
+
+
+  public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
+                                            AppContext appContext,
+                                            String clientHostname, int clientPort,
+                                            String trackingUrl,
+                                            Configuration conf) {
+    // Accepting configuration here to allow setting up fields as final
+    super(TezTestServiceTaskSchedulerService.class.getName());
+    this.appCallbackExecutor = createAppCallbackExecutorService();
+    this.appClientDelegate = createAppCallbackDelegate(appClient);
+    this.appContext = appContext;
+    this.serviceHosts = new LinkedList<String>();
+    this.containerFactory = new ContainerFactory(appContext);
+
+    this.memoryPerInstance = conf
+        .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
+    Preconditions.checkArgument(memoryPerInstance > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB +
+            " must be configured");
+
+    this.executorsPerInstance = conf.getInt(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+        -1);
+    Preconditions.checkArgument(executorsPerInstance > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE +
+            " must be configured");
+
+    this.coresPerInstance = conf
+        .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE,
+            executorsPerInstance);
+
+    this.containerPort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+    Preconditions.checkArgument(executorsPerInstance > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured");
+
+    this.clientHostname = clientHostname;
+    this.clientPort = clientPort;
+    this.trackingUrl = trackingUrl;
+
+    int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
+    int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
+    this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
+    this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler());
+
+    String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS);
+    if (hosts == null || hosts.length == 0) {
+      hosts = new String[]{"localhost"};
+    }
+    for (String host : hosts) {
+      serviceHosts.add(host);
+    }
+
+    LOG.info("Running with configuration: " +
+        "memoryPerInstance=" + memoryPerInstance +
+        ", vcoresPerInstance=" + coresPerInstance +
+        ", executorsPerInstance=" + executorsPerInstance +
+        ", resourcePerContainerInferred=" + resourcePerContainer +
+        ", hosts=" + serviceHosts.toString());
+
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    amRmClient.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    amRmClient.start();
+    RegisterApplicationMasterResponse response;
+    try {
+      amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl);
+    } catch (YarnException e) {
+      throw new TezUncheckedException(e);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @Override
+  public void serviceStop() {
+    if (!this.isStopped.getAndSet(true)) {
+
+      try {
+        TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+        amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage,
+            status.postCompletionTrackingUrl);
+      } catch (YarnException e) {
+        throw new TezUncheckedException(e);
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+      appCallbackExecutor.shutdownNow();
+    }
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    // TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
+    return Resource
+        .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+            serviceHosts.size() * coresPerInstance);
+  }
+
+  @Override
+  public int getClusterNodeCount() {
+    return serviceHosts.size();
+  }
+
+  @Override
+  public void resetMatchLocalityForAllHeldContainers() {
+  }
+
+  @Override
+  public Resource getTotalResources() {
+    return Resource
+        .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+            serviceHosts.size() * coresPerInstance);
+  }
+
+  @Override
+  public void blacklistNode(NodeId nodeId) {
+    LOG.info("DEBUG: BlacklistNode not supported");
+  }
+
+  @Override
+  public void unblacklistNode(NodeId nodeId) {
+    LOG.info("DEBUG: unBlacklistNode not supported");
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+                           Priority priority, Object containerSignature, Object clientCookie) {
+    String host = selectHost(hosts);
+    Container container =
+        containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+    runningTasks.put(task, container.getId());
+    appClientDelegate.taskAllocated(task, clientCookie, container);
+  }
+
+
+  @Override
+  public void allocateTask(Object task, Resource capability, ContainerId containerId,
+                           Priority priority, Object containerSignature, Object clientCookie) {
+    String host = selectHost(null);
+    Container container =
+        containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+    runningTasks.put(task, container.getId());
+    appClientDelegate.taskAllocated(task, clientCookie, container);
+  }
+
+  @Override
+  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+    ContainerId containerId = runningTasks.remove(task);
+    if (containerId == null) {
+      LOG.error("Could not determine ContainerId for task: " + task +
+          " . Could have hit a race condition. Ignoring." +
+          " The query may hang since this \"unknown\" container is now taking up a slot permanently");
+      return false;
+    }
+    appClientDelegate.containerBeingReleased(containerId);
+    return true;
+  }
+
+  @Override
+  public Object deallocateContainer(ContainerId containerId) {
+    LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId);
+    return null;
+  }
+
+  @Override
+  public void setShouldUnregister() {
+
+  }
+
+  @Override
+  public boolean hasUnregistered() {
+    // Nothing to do. No registration involved.
+    return true;
+  }
+
+  private ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+  }
+
+  private TaskSchedulerAppCallback createAppCallbackDelegate(
+      TaskSchedulerAppCallback realAppClient) {
+    return new TaskSchedulerAppCallbackWrapper(realAppClient,
+        appCallbackExecutor);
+  }
+
+  private String selectHost(String[] requestedHosts) {
+    String host = null;
+    if (requestedHosts != null && requestedHosts.length > 0) {
+      Arrays.sort(requestedHosts);
+      host = requestedHosts[0];
+      LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts));
+    } else {
+      host = serviceHosts.get(random.nextInt(serviceHosts.size()));
+      LOG.info("Selected random host: " + host + " since the request contained no host information");
+    }
+    return host;
+  }
+
+  static class ContainerFactory {
+    final AppContext appContext;
+    AtomicInteger nextId;
+
+    public ContainerFactory(AppContext appContext) {
+      this.appContext = appContext;
+      this.nextId = new AtomicInteger(2);
+    }
+
+    public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
+      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+      NodeId nodeId = NodeId.newInstance(hostname, port);
+      String nodeHttpAddress = "hostname:0";
+
+      Container container = Container.newInstance(containerId,
+          nodeId,
+          nodeHttpAddress,
+          capability,
+          priority,
+          null);
+
+      return container;
+    }
+  }
+
+  private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler {
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+
+    }
+
+    @Override
+    public void onShutdownRequest() {
+
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {
+
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public void onError(Throwable e) {
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
new file mode 100644
index 0000000..78cdcde
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.taskcomm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+import org.apache.tez.util.ProtoConverters;
+
+
+public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl {
+
+  private static final Log LOG = LogFactory.getLog(TezTestServiceTaskCommunicatorImpl.class);
+
+  private final TezTestServiceCommunicator communicator;
+  private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
+  private final ConcurrentMap<String, ByteBuffer> credentialMap;
+
+  public TezTestServiceTaskCommunicatorImpl(
+      TaskCommunicatorContext taskCommunicatorContext) {
+    super(taskCommunicatorContext);
+    // TODO Maybe make this configurable
+    this.communicator = new TezTestServiceCommunicator(3);
+
+    SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
+
+    // TODO Avoid reading this from the environment
+    baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    baseBuilder.setApplicationIdString(
+        taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
+    baseBuilder
+        .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
+    baseBuilder.setTokenIdentifier(getTokenIdentifier());
+
+    BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
+
+    credentialMap = new ConcurrentHashMap<String, ByteBuffer>();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    this.communicator.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    super.serviceStart();
+    this.communicator.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    super.serviceStop();
+  }
+
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
+    super.registerRunningContainer(containerId, hostname, port);
+  }
+
+  @Override
+  public void registerContainerEnd(ContainerId containerId) {
+    super.registerContainerEnd(containerId);
+  }
+
+  @Override
+  public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec,
+                                         Map<String, LocalResource> additionalResources,
+                                         Credentials credentials,
+                                         boolean credentialsChanged)  {
+    super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
+        credentialsChanged);
+    SubmitWorkRequestProto requestProto = null;
+    try {
+      requestProto = constructSubmitWorkRequest(containerId, taskSpec);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to construct request", e);
+    }
+    ContainerInfo containerInfo = getContainerInfo(containerId);
+    String host;
+    int port;
+    if (containerInfo != null) {
+      synchronized (containerInfo) {
+        host = containerInfo.host;
+        port = containerInfo.port;
+      }
+    } else {
+      // TODO Handle this properly
+      throw new RuntimeException("ContainerInfo not found for container: " + containerId +
+          ", while trying to launch task: " + taskSpec.getTaskAttemptID());
+    }
+    communicator.submitWork(requestProto, host, port,
+        new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
+          @Override
+          public void setResponse(SubmitWorkResponseProto response) {
+            LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
+            getTaskCommunicatorContext()
+                .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in.
+            LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+                containerId, t);
+          }
+        });
+  }
+
+  @Override
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+    super.unregisterRunningTaskAttempt(taskAttemptID);
+    // Nothing else to do for now. The push API in the test does not support termination of a running task
+  }
+
+  private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId,
+                                                            TaskSpec taskSpec) throws
+      IOException {
+    SubmitWorkRequestProto.Builder builder =
+        SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST);
+    builder.setContainerIdString(containerId.toString());
+    builder.setAmHost(getAddress().getHostName());
+    builder.setAmPort(getAddress().getPort());
+    Credentials taskCredentials = new Credentials();
+    // Credentials can change across DAGs. Ideally construct only once per DAG.
+    taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+
+    ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
+    if (credentialsBinary == null) {
+      credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+      credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
+    } else {
+      credentialsBinary = credentialsBinary.duplicate();
+    }
+    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+    builder.setTaskSpec(ProtoConverters.convertTaskSpecToProto(taskSpec));
+    return builder.build();
+  }
+
+  private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+    Credentials containerCredentials = new Credentials();
+    containerCredentials.addAll(credentials);
+    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+    ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+        containerTokens_dob.getLength());
+    return containerCredentialsBuffer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
new file mode 100644
index 0000000..2bca4ed
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.IOException;
+
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+
+public interface ContainerRunner {
+
+  void queueContainer(RunContainerRequestProto request) throws IOException;
+  void submitWork(SubmitWorkRequestProto request) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
new file mode 100644
index 0000000..f47bd67
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.tez.service.impl.TezTestService;
+
+public class MiniTezTestServiceCluster extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(MiniTezTestServiceCluster.class);
+
+  private final File testWorkDir;
+  private final long availableMemory;
+  private final int numExecutorsPerService;
+  private final String[] localDirs;
+  private final Configuration clusterSpecificConfiguration = new Configuration(false);
+
+  private TezTestService tezTestService;
+
+  public static MiniTezTestServiceCluster create(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+    return new MiniTezTestServiceCluster(clusterName, numExecutorsPerService, availableMemory, numLocalDirs);
+  }
+
+  // TODO Add support for multiple instances
+  private MiniTezTestServiceCluster(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+    super(clusterName + "_TezTestServerCluster");
+    Preconditions.checkArgument(numExecutorsPerService > 0);
+    Preconditions.checkArgument(availableMemory > 0);
+    Preconditions.checkArgument(numLocalDirs > 0);
+    String clusterNameTrimmed = clusterName.replace("$", "") + "_TezTestServerCluster";
+    File targetWorkDir = new File("target", clusterNameTrimmed);
+    try {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(targetWorkDir.getAbsolutePath()), true);
+    } catch (Exception e) {
+      LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e);
+      throw new RuntimeException("Could not cleanup test workDir: " + targetWorkDir, e);
+    }
+
+    if (Shell.WINDOWS) {
+      // The test working directory can exceed the maximum path length supported
+      // by some Windows APIs and cmd.exe (260 characters).  To work around this,
+      // create a symlink in temporary storage with a much shorter path,
+      // targeting the full path to the test working directory.  Then, use the
+      // symlink as the test working directory.
+      String targetPath = targetWorkDir.getAbsolutePath();
+      File link = new File(System.getProperty("java.io.tmpdir"),
+          String.valueOf(System.currentTimeMillis()));
+      String linkPath = link.getAbsolutePath();
+
+      try {
+        FileContext.getLocalFSFileContext().delete(new Path(linkPath), true);
+      } catch (IOException e) {
+        throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e);
+      }
+
+      // Guarantee target exists before creating symlink.
+      targetWorkDir.mkdirs();
+
+      Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+          Shell.getSymlinkCommand(targetPath, linkPath));
+      try {
+        shexec.execute();
+      } catch (IOException e) {
+        throw new YarnRuntimeException(String.format(
+            "failed to create symlink from %s to %s, shell output: %s", linkPath,
+            targetPath, shexec.getOutput()), e);
+      }
+
+      this.testWorkDir = link;
+    } else {
+      this.testWorkDir = targetWorkDir;
+    }
+    this.numExecutorsPerService = numExecutorsPerService;
+    this.availableMemory = availableMemory;
+
+    // Setup Local Dirs
+    localDirs = new String[numLocalDirs];
+    for (int i = 0 ; i < numLocalDirs ; i++) {
+      File f = new File(testWorkDir, "localDir");
+      f.mkdirs();
+      LOG.info("Created localDir: " + f.getAbsolutePath());
+      localDirs[i] = f.getAbsolutePath();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    tezTestService = new TezTestService(conf, numExecutorsPerService, availableMemory, localDirs);
+    tezTestService.init(conf);
+
+  }
+
+  @Override
+  public void serviceStart() {
+    tezTestService.start();
+
+    clusterSpecificConfiguration.set(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS,
+        getServiceAddress().getHostName());
+    clusterSpecificConfiguration.setInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT,
+        getServiceAddress().getPort());
+
+    clusterSpecificConfiguration.setInt(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+        numExecutorsPerService);
+    clusterSpecificConfiguration.setLong(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, availableMemory);
+  }
+
+  @Override
+  public void serviceStop() {
+    tezTestService.stop();
+  }
+
+  /**
+   * return the address at which the service is listening
+   * @return host:port
+   */
+  public InetSocketAddress getServiceAddress() {
+    Preconditions.checkState(getServiceState() == STATE.STARTED);
+    return tezTestService.getListenerAddress();
+  }
+
+  public int getShufflePort() {
+    Preconditions.checkState(getServiceState() == STATE.STARTED);
+    return tezTestService.getShufflePort();
+  }
+
+  public Configuration getClusterSpecificConfiguration() {
+    Preconditions.checkState(getServiceState() == STATE.STARTED);
+    return clusterSpecificConfiguration;
+  }
+
+  // Mainly for verification
+  public int getNumSubmissions() {
+    return tezTestService.getNumSubmissions();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
new file mode 100644
index 0000000..bf4a5bd
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+public class TezTestServiceConfConstants {
+
+  private static final String TEZ_TEST_SERVICE_PREFIX = "tez.test.service.";
+
+  /** Number of executors per instance - used by the scheduler */
+  public static final String TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "num.executors.per-instance";
+
+  /** Memory available per instance - used by the scheduler */
+  public static final String TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB = TEZ_TEST_SERVICE_PREFIX + "memory.per.instance.mb";
+
+  /** CPUs available per instance - used by the scheduler */
+  public static final String TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "vcpus.per.instance";
+
+
+  /** Hosts on which the service is running. Currently assuming a single port for all instances */
+  public static final String TEZ_TEST_SERVICE_HOSTS = TEZ_TEST_SERVICE_PREFIX + "hosts";
+
+  /** Port on which the Service(s) listen. Current a single port for all instances */
+  public static final String TEZ_TEST_SERVICE_RPC_PORT = TEZ_TEST_SERVICE_PREFIX + "rpc.port";
+
+  /** Number of threads to use in the AM to communicate with the external service */
+  public static final String TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS = TEZ_TEST_SERVICE_PREFIX + "communicator.num.threads";
+  public static final int TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 2;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
new file mode 100644
index 0000000..1108f72
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+
+@ProtocolInfo(protocolName = "org.apache.tez.service.TezTestServiceProtocolBlockingPB", protocolVersion = 1)
+public interface TezTestServiceProtocolBlockingPB extends TezTestServiceProtocolProtos.TezTestServiceProtocol.BlockingInterface {
+}
\ No newline at end of file


[2/3] tez git commit: TEZ-2090. Add tests for jobs running in external services. (sseth)

Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
new file mode 100644
index 0000000..4a6ce33
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.task.TaskReporter;
+import org.apache.tez.runtime.task.TezTaskRunner;
+import org.apache.tez.service.ContainerRunner;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.task.TezChild;
+import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
+import org.apache.tez.shufflehandler.ShuffleHandler;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.util.ProtoConverters;
+
+public class ContainerRunnerImpl extends AbstractService implements ContainerRunner {
+
+  private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class);
+
+  private final ListeningExecutorService executorService;
+  private final AtomicReference<InetSocketAddress> localAddress;
+  private final String[] localDirsBase;
+  private final Map<String, String> localEnv = new HashMap<String, String>();
+  private volatile FileSystem localFs;
+  private final long memoryPerExecutor;
+  // TODO Support for removing queued containers, interrupting / killing specific containers - when preemption is supported
+
+
+
+
+  public ContainerRunnerImpl(int numExecutors, String[] localDirsBase,
+                             AtomicReference<InetSocketAddress> localAddress,
+                             long totalMemoryAvailableBytes) {
+    super("ContainerRunnerImpl");
+    Preconditions.checkState(numExecutors > 0,
+        "Invalid number of executors: " + numExecutors + ". Must be > 0");
+    this.localDirsBase = localDirsBase;
+    this.localAddress = localAddress;
+
+    ExecutorService raw = Executors.newFixedThreadPool(numExecutors,
+        new ThreadFactoryBuilder().setNameFormat("ContainerExecutor %d").build());
+    this.executorService = MoreExecutors.listeningDecorator(raw);
+
+
+    // 80% of memory considered for accounted buffers. Rest for objects.
+    // TODO Tune this based on the available size.
+    this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) numExecutors);
+
+    LOG.info("ContainerRunnerImpl config: " +
+        "memoryPerExecutorDerived=" + memoryPerExecutor +
+        ", numExecutors=" + numExecutors
+    );
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to setup local filesystem instance", e);
+    }
+  }
+
+  @Override
+  public void serviceStart() {
+  }
+
+  public void setShufflePort(int shufflePort) {
+    AuxiliaryServiceHelper.setServiceDataIntoEnv(
+        TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+        ByteBuffer.allocate(4).putInt(shufflePort), localEnv);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  // TODO Move this into a utilities class
+  private static String createAppSpecificLocalDir(String baseDir, String applicationIdString,
+                                                  String user) {
+    return baseDir + File.separator + "usercache" + File.separator + user + File.separator +
+        "appcache" + File.separator + applicationIdString;
+  }
+
+  /**
+   * Submit a container which is ready for running.
+   * The regular pull mechanism will be used to fetch work from the AM
+   * @param request
+   * @throws IOException
+   */
+  @Override
+  public void queueContainer(RunContainerRequestProto request) throws IOException {
+    LOG.info("Queuing container for execution: " + request);
+
+    Map<String, String> env = new HashMap<String, String>();
+    env.putAll(localEnv);
+    env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
+
+    String[] localDirs = new String[localDirsBase.length];
+
+    // Setup up local dirs to be application specific, and create them.
+    for (int i = 0; i < localDirsBase.length; i++) {
+      localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
+          request.getUser());
+      localFs.mkdirs(new Path(localDirs[i]));
+    }
+    LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs));
+
+
+    // Setup workingDir. This is otherwise setup as Environment.PWD
+    // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
+    String workingDir = localDirs[0];
+
+    Credentials credentials = new Credentials();
+    DataInputBuffer dib = new DataInputBuffer();
+    byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
+    dib.reset(tokenBytes, tokenBytes.length);
+    credentials.readTokenStorageStream(dib);
+
+    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+
+    // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
+    LOG.info("DEBUG: Registering request with the ShuffleHandler");
+    ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
+
+
+    ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()),
+        new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
+        workingDir, credentials, memoryPerExecutor);
+    ListenableFuture<ContainerExecutionResult> future = executorService
+        .submit(callable);
+    Futures.addCallback(future, new ContainerRunnerCallback(request, callable));
+  }
+
+  /**
+   * Submit an entire work unit - containerId + TaskSpec.
+   * This is intended for a task push from the AM
+   *
+   * @param request
+   * @throws IOException
+   */
+  @Override
+  public void submitWork(SubmitWorkRequestProto request) throws
+      IOException {
+    LOG.info("Queuing work for execution: " + request);
+
+    Map<String, String> env = new HashMap<String, String>();
+    env.putAll(localEnv);
+    env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
+
+    String[] localDirs = new String[localDirsBase.length];
+
+    // Setup up local dirs to be application specific, and create them.
+    for (int i = 0; i < localDirsBase.length; i++) {
+      localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
+          request.getUser());
+      localFs.mkdirs(new Path(localDirs[i]));
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Dirs are: " + Arrays.toString(localDirs));
+    }
+
+    // Setup workingDir. This is otherwise setup as Environment.PWD
+    // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
+    String workingDir = localDirs[0];
+
+    Credentials credentials = new Credentials();
+    DataInputBuffer dib = new DataInputBuffer();
+    byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
+    dib.reset(tokenBytes, tokenBytes.length);
+    credentials.readTokenStorageStream(dib);
+
+    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+
+    // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
+    LOG.info("DEBUG: Registering request with the ShuffleHandler");
+    ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
+    TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
+        new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
+        workingDir, credentials, memoryPerExecutor);
+    ListenableFuture<ContainerExecutionResult> future = executorService.submit(callable);
+    Futures.addCallback(future, new TaskRunnerCallback(request, callable));
+  }
+
+
+  static class ContainerRunnerCallable implements Callable<ContainerExecutionResult> {
+
+    private final RunContainerRequestProto request;
+    private final Configuration conf;
+    private final String workingDir;
+    private final String[] localDirs;
+    private final Map<String, String> envMap;
+    private final String pid = null;
+    private final ObjectRegistryImpl objectRegistry;
+    private final ExecutionContext executionContext;
+    private final Credentials credentials;
+    private final long memoryAvailable;
+    private volatile TezChild tezChild;
+
+
+    ContainerRunnerCallable(RunContainerRequestProto request, Configuration conf,
+                            ExecutionContext executionContext, Map<String, String> envMap,
+                            String[] localDirs, String workingDir, Credentials credentials,
+                            long memoryAvailable) {
+      this.request = request;
+      this.conf = conf;
+      this.executionContext = executionContext;
+      this.envMap = envMap;
+      this.workingDir = workingDir;
+      this.localDirs = localDirs;
+      this.objectRegistry = new ObjectRegistryImpl();
+      this.credentials = credentials;
+      this.memoryAvailable = memoryAvailable;
+
+    }
+
+    @Override
+    public ContainerExecutionResult call() throws Exception {
+      Stopwatch sw = new Stopwatch().start();
+      tezChild =
+          new TezChild(conf, request.getAmHost(), request.getAmPort(),
+              request.getContainerIdString(),
+              request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs,
+              envMap, objectRegistry, pid,
+              executionContext, credentials, memoryAvailable, request.getUser());
+      ContainerExecutionResult result = tezChild.run();
+      LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+          sw.stop().elapsedMillis());
+      return result;
+    }
+
+    public TezChild getTezChild() {
+      return this.tezChild;
+    }
+  }
+
+
+  final class ContainerRunnerCallback implements FutureCallback<ContainerExecutionResult> {
+
+    private final RunContainerRequestProto request;
+    private final ContainerRunnerCallable containerRunnerCallable;
+
+    ContainerRunnerCallback(RunContainerRequestProto request,
+                            ContainerRunnerCallable containerRunnerCallable) {
+      this.request = request;
+      this.containerRunnerCallable = containerRunnerCallable;
+    }
+
+    // TODO Proper error handling
+    @Override
+    public void onSuccess(ContainerExecutionResult result) {
+      switch (result.getExitStatus()) {
+        case SUCCESS:
+          LOG.info("Successfully finished: " + request.getApplicationIdString() + ", containerId=" +
+              request.getContainerIdString());
+          break;
+        case EXECUTION_FAILURE:
+          LOG.info("Failed to run: " + request.getApplicationIdString() + ", containerId=" +
+              request.getContainerIdString(), result.getThrowable());
+          break;
+        case INTERRUPTED:
+          LOG.info(
+              "Interrupted while running: " + request.getApplicationIdString() + ", containerId=" +
+                  request.getContainerIdString(), result.getThrowable());
+          break;
+        case ASKED_TO_DIE:
+          LOG.info(
+              "Asked to die while running: " + request.getApplicationIdString() + ", containerId=" +
+                  request.getContainerIdString());
+          break;
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      LOG.error(
+          "TezChild execution failed for : " + request.getApplicationIdString() + ", containerId=" +
+              request.getContainerIdString(), t);
+      TezChild tezChild = containerRunnerCallable.getTezChild();
+      if (tezChild != null) {
+        tezChild.shutdown();
+      }
+    }
+  }
+
+  static class TaskRunnerCallable implements Callable<ContainerExecutionResult> {
+
+    private final SubmitWorkRequestProto request;
+    private final Configuration conf;
+    private final String workingDir;
+    private final String[] localDirs;
+    private final Map<String, String> envMap;
+    private final String pid = null;
+    private final ObjectRegistryImpl objectRegistry;
+    private final ExecutionContext executionContext;
+    private final Credentials credentials;
+    private final long memoryAvailable;
+    private final ListeningExecutorService executor;
+    private volatile TezTaskRunner taskRunner;
+    private volatile TaskReporter taskReporter;
+    private TezTaskUmbilicalProtocol umbilical;
+
+
+    TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf,
+                             ExecutionContext executionContext, Map<String, String> envMap,
+                             String[] localDirs, String workingDir, Credentials credentials,
+                             long memoryAvailable) {
+      this.request = request;
+      this.conf = conf;
+      this.executionContext = executionContext;
+      this.envMap = envMap;
+      this.workingDir = workingDir;
+      this.localDirs = localDirs;
+      this.objectRegistry = new ObjectRegistryImpl();
+      this.credentials = credentials;
+      this.memoryAvailable = memoryAvailable;
+      // TODO This executor seems unnecessary. Here and TezChild
+      ExecutorService executorReal = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat("TezTaskRunner_" + request.getTaskSpec().getTaskAttemptIdString()).build());
+      executor = MoreExecutors.listeningDecorator(executorReal);
+    }
+
+    @Override
+    public ContainerExecutionResult call() throws Exception {
+
+      // TODO Consolidate this code with TezChild.
+      Stopwatch sw = new Stopwatch().start();
+      UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser());
+      taskUgi.addCredentials(credentials);
+
+      Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+      Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+      serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+          TezCommonUtils.convertJobTokenToBytes(jobToken));
+      Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+      UserGroupInformation taskOwner =
+          UserGroupInformation.createRemoteUser(request.getTokenIdentifier());
+      final InetSocketAddress address =
+          NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
+      SecurityUtil.setTokenService(jobToken, address);
+      taskOwner.addToken(jobToken);
+      umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+        @Override
+        public TezTaskUmbilicalProtocol run() throws Exception {
+          return RPC.getProxy(TezTaskUmbilicalProtocol.class,
+              TezTaskUmbilicalProtocol.versionID, address, conf);
+        }
+      });
+      // TODO Stop reading this on each request.
+      taskReporter = new TaskReporter(
+          umbilical,
+          conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+              TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT),
+          conf.getLong(
+              TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+              TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT),
+          conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+              TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT),
+          new AtomicLong(0),
+          request.getContainerIdString());
+
+      taskRunner = new TezTaskRunner(conf, taskUgi, localDirs,
+          ProtoConverters.getTaskSpecfromProto(request.getTaskSpec()), umbilical,
+          request.getAppAttemptNumber(),
+          serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry,
+          pid,
+          executionContext, memoryAvailable);
+
+      boolean shouldDie;
+      try {
+        shouldDie = !taskRunner.run();
+        if (shouldDie) {
+          LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+          return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+              "Asked to die by the AM");
+        }
+      } catch (IOException e) {
+        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+            e, "TaskExecutionFailure: " + e.getMessage());
+      } catch (TezException e) {
+        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+            e, "TaskExecutionFailure: " + e.getMessage());
+      } finally {
+        FileSystem.closeAllForUGI(taskUgi);
+      }
+      LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+          sw.stop().elapsedMillis());
+      return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+          null);
+    }
+
+    public void shutdown() {
+      executor.shutdownNow();
+      if (taskReporter != null) {
+        taskReporter.shutdown();
+      }
+      if (umbilical != null) {
+        RPC.stopProxy(umbilical);
+      }
+    }
+  }
+
+
+  final class TaskRunnerCallback implements FutureCallback<ContainerExecutionResult> {
+
+    private final SubmitWorkRequestProto request;
+    private final TaskRunnerCallable taskRunnerCallable;
+
+    TaskRunnerCallback(SubmitWorkRequestProto request,
+                            TaskRunnerCallable containerRunnerCallable) {
+      this.request = request;
+      this.taskRunnerCallable = containerRunnerCallable;
+    }
+
+    // TODO Proper error handling
+    @Override
+    public void onSuccess(ContainerExecutionResult result) {
+      switch (result.getExitStatus()) {
+        case SUCCESS:
+          LOG.info("Successfully finished: " + request.getApplicationIdString() + ", containerId=" +
+              request.getContainerIdString());
+          break;
+        case EXECUTION_FAILURE:
+          LOG.info("Failed to run: " + request.getApplicationIdString() + ", containerId=" +
+              request.getContainerIdString(), result.getThrowable());
+          break;
+        case INTERRUPTED:
+          LOG.info(
+              "Interrupted while running: " + request.getApplicationIdString() + ", containerId=" +
+                  request.getContainerIdString(), result.getThrowable());
+          break;
+        case ASKED_TO_DIE:
+          LOG.info(
+              "Asked to die while running: " + request.getApplicationIdString() + ", containerId=" +
+                  request.getContainerIdString());
+          break;
+      }
+      taskRunnerCallable.shutdown();
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      LOG.error(
+          "TezTaskRunner execution failed for : " + request.getApplicationIdString() + ", containerId=" +
+              request.getContainerIdString(), t);
+      taskRunnerCallable.shutdown();
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
new file mode 100644
index 0000000..012e352
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.service.ContainerRunner;
+import org.apache.tez.shufflehandler.ShuffleHandler;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+
+public class TezTestService extends AbstractService implements ContainerRunner {
+
+  private static final Logger LOG = Logger.getLogger(TezTestService.class);
+
+  private final Configuration shuffleHandlerConf;
+  private final int numExecutors;
+
+  private final TezTestServiceProtocolServerImpl server;
+  private final ContainerRunnerImpl containerRunner;
+  private final String[] localDirs;
+
+  private final AtomicInteger numSubmissions = new AtomicInteger(0);
+
+
+  private final AtomicReference<InetSocketAddress> address = new AtomicReference<InetSocketAddress>();
+
+  public TezTestService(Configuration conf, int numExecutors, long memoryAvailable, String[] localDirs) {
+    super(TezTestService.class.getSimpleName());
+    this.numExecutors = numExecutors;
+    this.localDirs = localDirs;
+
+    long memoryAvailableBytes = memoryAvailable;
+    long jvmMax = Runtime.getRuntime().maxMemory();
+
+    LOG.info(TezTestService.class.getSimpleName() + " created with the following configuration: " +
+        "numExecutors=" + numExecutors +
+        ", workDirs=" + Arrays.toString(localDirs) +
+        ", memoryAvailable=" + memoryAvailable +
+        ", jvmMaxMemory=" + jvmMax);
+
+    Preconditions.checkArgument(this.numExecutors > 0);
+    Preconditions.checkArgument(this.localDirs != null && this.localDirs.length > 0,
+        "Work dirs must be specified");
+    Preconditions.checkState(jvmMax >= memoryAvailableBytes,
+        "Invalid configuration. Xmx value too small. maxAvailable=" + jvmMax + ", configured=" +
+            memoryAvailableBytes);
+
+    this.shuffleHandlerConf = new Configuration(conf);
+    // Start Shuffle on a random port
+    this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs));
+
+    this.server = new TezTestServiceProtocolServerImpl(this, address);
+    this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, address,
+        memoryAvailableBytes);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    server.init(conf);
+    containerRunner.init(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    ShuffleHandler.initializeAndStart(shuffleHandlerConf);
+    containerRunner.setShufflePort(ShuffleHandler.get().getPort());
+    server.start();
+    containerRunner.start();
+  }
+
+  public void serviceStop() throws Exception {
+    containerRunner.stop();
+    server.stop();
+    ShuffleHandler.get().stop();
+  }
+
+  public InetSocketAddress getListenerAddress() {
+    return server.getBindAddress();
+  }
+
+  public int getShufflePort() {
+    return ShuffleHandler.get().getPort();
+  }
+
+
+
+  @Override
+  public void queueContainer(RunContainerRequestProto request) throws IOException {
+    numSubmissions.incrementAndGet();
+    containerRunner.queueContainer(request);
+  }
+
+  @Override
+  public void submitWork(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+      IOException {
+    numSubmissions.incrementAndGet();
+    containerRunner.submitWork(request);
+  }
+
+  public int getNumSubmissions() {
+    return numSubmissions.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java
new file mode 100644
index 0000000..10d2952
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+
+
+public class TezTestServiceProtocolClientImpl implements TezTestServiceProtocolBlockingPB {
+
+  private final Configuration conf;
+  private final InetSocketAddress serverAddr;
+  TezTestServiceProtocolBlockingPB proxy;
+
+
+  public TezTestServiceProtocolClientImpl(Configuration conf, String hostname, int port) {
+    this.conf = conf;
+    this.serverAddr = NetUtils.createSocketAddr(hostname, port);
+  }
+
+  @Override
+  public RunContainerResponseProto runContainer(RpcController controller,
+                                                RunContainerRequestProto request) throws
+      ServiceException {
+    try {
+      return getProxy().runContainer(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public TezTestServiceProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller,
+                                                                         TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+      ServiceException {
+    try {
+      return getProxy().submitWork(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+
+  public TezTestServiceProtocolBlockingPB getProxy() throws IOException {
+    if (proxy == null) {
+      proxy = createProxy();
+    }
+    return proxy;
+  }
+
+  public TezTestServiceProtocolBlockingPB createProxy() throws IOException {
+    TezTestServiceProtocolBlockingPB p;
+    // TODO Fix security
+    RPC.setProtocolEngine(conf, TezTestServiceProtocolBlockingPB.class, ProtobufRpcEngine.class);
+    p = (TezTestServiceProtocolBlockingPB) RPC
+        .getProxy(TezTestServiceProtocolBlockingPB.class, 0, serverAddr, conf);
+    return p;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
new file mode 100644
index 0000000..d7f8444
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.service.ContainerRunner;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+
+public class TezTestServiceProtocolServerImpl extends AbstractService
+    implements TezTestServiceProtocolBlockingPB {
+
+  private static final Log LOG = LogFactory.getLog(TezTestServiceProtocolServerImpl.class);
+
+  private final ContainerRunner containerRunner;
+  private RPC.Server server;
+  private final AtomicReference<InetSocketAddress> bindAddress;
+
+
+  public TezTestServiceProtocolServerImpl(ContainerRunner containerRunner,
+                                          AtomicReference<InetSocketAddress> address) {
+    super(TezTestServiceProtocolServerImpl.class.getSimpleName());
+    this.containerRunner = containerRunner;
+    this.bindAddress = address;
+  }
+
+  @Override
+  public RunContainerResponseProto runContainer(RpcController controller,
+                                                RunContainerRequestProto request) throws
+      ServiceException {
+    LOG.info("Received request: " + request);
+    try {
+      containerRunner.queueContainer(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return RunContainerResponseProto.getDefaultInstance();
+  }
+
+  @Override
+  public SubmitWorkResponseProto submitWork(RpcController controller, TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+      ServiceException {
+    LOG.info("Received submitWork request: " + request);
+    try {
+      containerRunner.submitWork(request);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return SubmitWorkResponseProto.getDefaultInstance();
+  }
+
+
+  @Override
+  public void serviceStart() {
+    Configuration conf = getConfig();
+
+    int numHandlers = 3;
+    InetSocketAddress addr = new InetSocketAddress(0);
+
+    try {
+      server = createServer(TezTestServiceProtocolBlockingPB.class, addr, conf, numHandlers,
+          TezTestServiceProtocolProtos.TezTestServiceProtocol.newReflectiveBlockingService(this));
+      server.start();
+    } catch (IOException e) {
+      LOG.error("Failed to run RPC Server", e);
+      throw new RuntimeException(e);
+    }
+
+    InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
+    this.bindAddress.set(NetUtils.createSocketAddrForHost(
+        serverBindAddress.getAddress().getCanonicalHostName(),
+        serverBindAddress.getPort()));
+    LOG.info("Instantiated TestTestServiceListener at " + bindAddress);
+  }
+
+  @Override
+  public void serviceStop() {
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  InetSocketAddress getBindAddress() {
+    return this.bindAddress.get();
+  }
+
+  private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
+                                  int numHandlers, BlockingService blockingService) throws
+      IOException {
+    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
+    RPC.Server server = new RPC.Builder(conf)
+        .setProtocol(pbProtocol)
+        .setInstance(blockingService)
+        .setBindAddress(addr.getHostName())
+        .setPort(0)
+        .setNumHandlers(numHandlers)
+        .build();
+    // TODO Add security.
+    return server;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
new file mode 100644
index 0000000..65588fe
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.shufflehandler;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+      int chunkSize, boolean manageOsCache, int readaheadLength,
+      ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+            fd,
+            getStartOffset(), getEndOffset() - getStartOffset(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
new file mode 100644
index 0000000..bdffe52
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.shufflehandler;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+  private final long count;
+  private final long position;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private final FileChannel fileChannel;
+  
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+      boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+      String identifier, int shuffleBufferSize, 
+      boolean shuffleTransferToAllowed) throws IOException {
+    super(file.getChannel(), position, count);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+    this.fileChannel = file.getChannel();
+    this.count = count;
+    this.position = position;
+    this.shuffleBufferSize = shuffleBufferSize;
+    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position)
+      throws IOException {
+    if (manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+          getPosition() + position, readaheadLength,
+          getPosition() + getCount(), readaheadRequest);
+    }
+    
+    if(this.shuffleTransferToAllowed) {
+      return super.transferTo(target, position);
+    } else {
+      return customShuffleTransfer(target, position);
+    } 
+  }
+
+  /**
+   * This method transfers data using local buffer. It transfers data from 
+   * a disk to a local buffer in memory, and then it transfers data from the 
+   * buffer to the target. This is used only if transferTo is disallowed in
+   * the configuration file. super.TransferTo does not perform well on Windows 
+   * due to a small IO request generated. customShuffleTransfer can control 
+   * the size of the IO requests by changing the size of the intermediate 
+   * buffer.
+   */
+  @VisibleForTesting
+  long customShuffleTransfer(WritableByteChannel target, long position)
+      throws IOException {
+    long actualCount = this.count - position;
+    if (actualCount < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position +
+          " (expected: 0 - " + (this.count - 1) + ')');
+    }
+    if (actualCount == 0) {
+      return 0L;
+    }
+    
+    long trans = actualCount;
+    int readSize;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+    
+    while(trans > 0L &&
+        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+      //adjust counters and buffer limit
+      if(readSize < trans) {
+        trans -= readSize;
+        position += readSize;
+        byteBuffer.flip();
+      } else {
+        //We can read more than we need if the actualCount is not multiple 
+        //of the byteBuffer size and file is big enough. In that case we cannot
+        //use flip method but we need to set buffer limit manually to trans.
+        byteBuffer.limit((int)trans);
+        byteBuffer.position(0);
+        position += trans; 
+        trans = 0;
+      }
+      
+      //write data to the target
+      while(byteBuffer.hasRemaining()) {
+        target.write(byteBuffer);
+      }
+      
+      byteBuffer.clear();
+    }
+    
+    return actualCount - trans;
+  }
+
+  
+  @Override
+  public void releaseExternalResources() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    super.releaseExternalResources();
+  }
+  
+  /**
+   * Call when the transfer completes successfully so we can advise the OS that
+   * we don't need the region to be cached anymore.
+   */
+  public void transferSuccessful() {
+    if (manageOsCache && getCount() > 0) {
+      try {
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+           fd, getPosition(), getCount(),
+           NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
new file mode 100644
index 0000000..9a51ca0
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.shufflehandler;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+
+class IndexCache {
+
+  private final Configuration conf;
+  private final int totalMemoryAllowed;
+  private AtomicInteger totalMemoryUsed = new AtomicInteger();
+  private static final Log LOG = LogFactory.getLog(IndexCache.class);
+
+  private final ConcurrentHashMap<String,IndexInformation> cache =
+      new ConcurrentHashMap<String,IndexInformation>();
+
+  private final LinkedBlockingQueue<String> queue =
+      new LinkedBlockingQueue<String>();
+
+  public IndexCache(Configuration conf) {
+    this.conf = conf;
+    totalMemoryAllowed = 10 * 1024 * 1024;
+    LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+  }
+
+  /**
+   * This method gets the index information for the given mapId and reduce.
+   * It reads the index file into cache if it is not already present.
+   * @param mapId
+   * @param reduce
+   * @param fileName The file to read the index information from if it is not
+   *                 already present in the cache
+   * @param expectedIndexOwner The expected owner of the index file
+   * @return The Index Information
+   * @throws IOException
+   */
+  public TezIndexRecord getIndexInformation(String mapId, int reduce,
+                                         Path fileName, String expectedIndexOwner)
+      throws IOException {
+
+    IndexInformation info = cache.get(mapId);
+
+    if (info == null) {
+      info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
+    } else {
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
+        }
+      }
+      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+    }
+
+    if (info.mapSpillRecord.size() == 0 ||
+        info.mapSpillRecord.size() <= reduce) {
+      throw new IOException("Invalid request " +
+          " Map Id = " + mapId + " Reducer = " + reduce +
+          " Index Info Length = " + info.mapSpillRecord.size());
+    }
+    return info.mapSpillRecord.getIndex(reduce);
+  }
+
+  private boolean isUnderConstruction(IndexInformation info) {
+    synchronized(info) {
+      return (null == info.mapSpillRecord);
+    }
+  }
+
+  private IndexInformation readIndexFileToCache(Path indexFileName,
+                                                String mapId,
+                                                String expectedIndexOwner)
+      throws IOException {
+    IndexInformation info;
+    IndexInformation newInd = new IndexInformation();
+    if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
+        }
+      }
+      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      return info;
+    }
+    LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+    TezSpillRecord tmp = null;
+    try {
+      tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+    } catch (Throwable e) {
+      tmp = new TezSpillRecord(0);
+      cache.remove(mapId);
+      throw new IOException("Error Reading IndexFile", e);
+    } finally {
+      synchronized (newInd) {
+        newInd.mapSpillRecord = tmp;
+        newInd.notifyAll();
+      }
+    }
+    queue.add(mapId);
+
+    if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
+      freeIndexInformation();
+    }
+    return newInd;
+  }
+
+  /**
+   * This method removes the map from the cache if index information for this
+   * map is loaded(size>0), index information entry in cache will not be 
+   * removed if it is in the loading phrase(size=0), this prevents corruption  
+   * of totalMemoryUsed. It should be called when a map output on this tracker 
+   * is discarded.
+   * @param mapId The taskID of this map.
+   */
+  public void removeMap(String mapId) {
+    IndexInformation info = cache.get(mapId);
+    if (info == null || ((info != null) && isUnderConstruction(info))) {
+      return;
+    }
+    info = cache.remove(mapId);
+    if (info != null) {
+      totalMemoryUsed.addAndGet(-info.getSize());
+      if (!queue.remove(mapId)) {
+        LOG.warn("Map ID" + mapId + " not found in queue!!");
+      }
+    } else {
+      LOG.info("Map ID " + mapId + " not found in cache");
+    }
+  }
+
+  /**
+   * This method checks if cache and totolMemoryUsed is consistent.
+   * It is only used for unit test.
+   * @return True if cache and totolMemoryUsed is consistent
+   */
+  boolean checkTotalMemoryUsed() {
+    int totalSize = 0;
+    for (IndexInformation info : cache.values()) {
+      totalSize += info.getSize();
+    }
+    return totalSize == totalMemoryUsed.get();
+  }
+
+  /**
+   * Bring memory usage below totalMemoryAllowed.
+   */
+  private synchronized void freeIndexInformation() {
+    while (totalMemoryUsed.get() > totalMemoryAllowed) {
+      String s = queue.remove();
+      IndexInformation info = cache.remove(s);
+      if (info != null) {
+        totalMemoryUsed.addAndGet(-info.getSize());
+      }
+    }
+  }
+
+  private static class IndexInformation {
+    TezSpillRecord mapSpillRecord;
+
+    int getSize() {
+      return mapSpillRecord == null
+          ? 0
+          : mapSpillRecord.size() * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
new file mode 100644
index 0000000..cc82d74
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -0,0 +1,840 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.shufflehandler;
+
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import javax.crypto.SecretKey;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+public class ShuffleHandler {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+
+  public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "tez.shuffle.handler.local-dirs";
+
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+  
+  // pattern to identify errors related to the client closing the socket early
+  // idea borrowed from Netty SslHandler
+  private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
+      "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
+      Pattern.CASE_INSENSITIVE);
+
+  private int port;
+  private final ChannelFactory selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup();
+  protected HttpPipelineFactory pipelineFact;
+  private final int sslFileBufferSize;
+  private final Configuration conf;
+
+  private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>();
+
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final int maxShuffleConnections;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+  private Map<String,String> userRsrc;
+  private JobTokenSecretManager secretManager;
+
+  // TODO Fix this for tez.
+  public static final String MAPREDUCE_SHUFFLE_SERVICEID =
+      "mapreduce_shuffle";
+
+  public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port";
+  public static final int DEFAULT_SHUFFLE_PORT = 15551;
+
+  // TODO Change configs to remove mapreduce references.
+  public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED =
+      "mapreduce.shuffle.connection-keep-alive.enable";
+  public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
+
+  public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT =
+      "mapreduce.shuffle.connection-keep-alive.timeout";
+  public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5; //seconds
+
+  public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+      "mapreduce.shuffle.mapoutput-info.meta.cache.size";
+  public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+      1000;
+
+  public static final String CONNECTION_CLOSE = "close";
+
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+    "mapreduce.shuffle.ssl.file.buffer.size";
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+  public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
+  public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
+  
+  public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
+  // 0 implies Netty default of 2 * number of available processors
+  public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+  
+  public static final String SHUFFLE_BUFFER_SIZE = 
+      "mapreduce.shuffle.transfer.buffer.size";
+  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+  
+  public static final String  SHUFFLE_TRANSFERTO_ALLOWED = 
+      "mapreduce.shuffle.transferTo.allowed";
+  public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
+  public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = 
+      false;
+
+  final boolean connectionKeepAliveEnabled;
+  final int connectionKeepAliveTimeOut;
+  final int mapOutputMetaInfoCacheSize;
+  private static final AtomicBoolean started = new AtomicBoolean(false);
+  private static final AtomicBoolean initing = new AtomicBoolean(false);
+  private static ShuffleHandler INSTANCE;
+
+  @Metrics(about="Shuffle output metrics", context="mapred")
+  static class ShuffleMetrics implements ChannelFutureListener {
+    @Metric("Shuffle output in bytes")
+    MutableCounterLong shuffleOutputBytes;
+    @Metric("# of failed shuffle outputs")
+    MutableCounterInt shuffleOutputsFailed;
+    @Metric("# of succeeeded shuffle outputs")
+    MutableCounterInt shuffleOutputsOK;
+    @Metric("# of current shuffle connections")
+    MutableGaugeInt shuffleConnections;
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (future.isSuccess()) {
+        shuffleOutputsOK.incr();
+      } else {
+        shuffleOutputsFailed.incr();
+      }
+      shuffleConnections.decr();
+    }
+  }
+
+  public ShuffleHandler(Configuration conf) {
+    this.conf = conf;
+    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+        DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+    maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
+        DEFAULT_MAX_SHUFFLE_CONNECTIONS);
+    int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS,
+        DEFAULT_MAX_SHUFFLE_THREADS);
+    if (maxShuffleThreads == 0) {
+      maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
+    }
+
+    shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
+        DEFAULT_SHUFFLE_BUFFER_SIZE);
+
+    shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+        (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+            DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
+
+    ThreadFactory bossFactory = new ThreadFactoryBuilder()
+        .setNameFormat("ShuffleHandler Netty Boss #%d")
+        .build();
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+        .setNameFormat("ShuffleHandler Netty Worker #%d")
+        .build();
+
+    selector = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(bossFactory),
+        Executors.newCachedThreadPool(workerFactory),
+        maxShuffleThreads);
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+        DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+    connectionKeepAliveEnabled =
+        conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
+            DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
+    connectionKeepAliveTimeOut =
+        Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
+            DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
+    mapOutputMetaInfoCacheSize =
+        Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
+            DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
+
+    userRsrc = new ConcurrentHashMap<String,String>();
+    secretManager = new JobTokenSecretManager();
+  }
+
+
+  public void start() throws Exception {
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    try {
+      pipelineFact = new HttpPipelineFactory(conf);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    bootstrap.setPipelineFactory(pipelineFact);
+    port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
+    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    accepted.add(ch);
+    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
+    pipelineFact.SHUFFLE.setPort(port);
+    LOG.info("TezShuffleHandler" + " listening on port " + port);
+  }
+
+  public static void initializeAndStart(Configuration conf) throws Exception {
+    if (!initing.getAndSet(true)) {
+      INSTANCE = new ShuffleHandler(conf);
+      INSTANCE.start();
+      started.set(true);
+    }
+  }
+
+  public static ShuffleHandler get() {
+    Preconditions.checkState(started.get(), "ShuffleHandler must be started before invoking started");
+    return INSTANCE;
+  }
+
+  /**
+   * Serialize the shuffle port into a ByteBuffer for use later on.
+   * @param port the port to be sent to the ApplciationMaster
+   * @return the serialized form of the port.
+   */
+  public static ByteBuffer serializeMetaData(int port) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer port_dob = new DataOutputBuffer();
+    port_dob.writeInt(port);
+    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+  }
+
+  /**
+   * A helper function to deserialize the metadata returned by ShuffleHandler.
+   * @param meta the metadata returned by the ShuffleHandler
+   * @return the port the Shuffle Handler is listening on to serve shuffle data.
+   */
+  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+    //TODO this should be returning a class not just an int
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    int port = in.readInt();
+    return port;
+  }
+
+  /**
+   * A helper function to serialize the JobTokenIdentifier to be sent to the
+   * ShuffleHandler as ServiceData.
+   * @param jobToken the job token to be used for authentication of
+   * shuffle data requests.
+   * @return the serialized version of the jobToken.
+   */
+  public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+    jobToken.write(jobToken_dob);
+    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+  }
+
+  static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(secret);
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+    jt.readFields(in);
+    return jt;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void registerApplication(String applicationIdString, Token<JobTokenIdentifier> appToken,
+                                  String user) {
+    Boolean registered = registeredApps.putIfAbsent(applicationIdString, Boolean.valueOf(true));
+    if (registered == null) {
+      recordJobShuffleInfo(applicationIdString, user, appToken);
+    }
+  }
+
+  public void unregisterApplication(String applicationIdString) {
+    removeJobShuffleInfo(applicationIdString);
+  }
+
+
+  public void stop() throws Exception {
+    accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    if (selector != null) {
+      ServerBootstrap bootstrap = new ServerBootstrap(selector);
+      bootstrap.releaseExternalResources();
+    }
+    if (pipelineFact != null) {
+      pipelineFact.destroy();
+    }
+  }
+
+  protected Shuffle getShuffle(Configuration conf) {
+    return new Shuffle(conf);
+  }
+
+
+  private void addJobToken(String appIdString, String user,
+      Token<JobTokenIdentifier> jobToken) {
+    String jobIdString = appIdString.replace("application", "job");
+    userRsrc.put(jobIdString, user);
+    secretManager.addTokenForJob(jobIdString, jobToken);
+    LOG.info("Added token for " + jobIdString);
+  }
+
+  private void recordJobShuffleInfo(String appIdString, String user,
+      Token<JobTokenIdentifier> jobToken) {
+    addJobToken(appIdString, user, jobToken);
+  }
+
+  private void removeJobShuffleInfo(String appIdString) {
+    secretManager.removeTokenForJob(appIdString);
+    userRsrc.remove(appIdString);
+  }
+
+  class HttpPipelineFactory implements ChannelPipelineFactory {
+
+    final Shuffle SHUFFLE;
+    private SSLFactory sslFactory;
+
+    public HttpPipelineFactory(Configuration conf) throws Exception {
+      SHUFFLE = getShuffle(conf);
+      // TODO Setup SSL Shuffle
+//      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+//                          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+//        LOG.info("Encrypted shuffle is enabled.");
+//        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+//        sslFactory.init();
+//      }
+    }
+
+    public void destroy() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      if (sslFactory != null) {
+        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+      }
+      pipeline.addLast("decoder", new HttpRequestDecoder());
+      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("chunking", new ChunkedWriteHandler());
+      pipeline.addLast("shuffle", SHUFFLE);
+      return pipeline;
+      // TODO factor security manager into pipeline
+      // TODO factor out encode/decode to permit binary shuffle
+      // TODO factor out decode of index to permit alt. models
+    }
+
+  }
+
+  class Shuffle extends SimpleChannelUpstreamHandler {
+
+    private final Configuration conf;
+    private final IndexCache indexCache;
+    private final LocalDirAllocator lDirAlloc =
+      new LocalDirAllocator(SHUFFLE_HANDLER_LOCAL_DIRS);
+    private int port;
+
+    public Shuffle(Configuration conf) {
+      this.conf = conf;
+      indexCache = new IndexCache(conf);
+      this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
+    }
+    
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    private List<String> splitMaps(List<String> mapq) {
+      if (null == mapq) {
+        return null;
+      }
+      final List<String> ret = new ArrayList<String>();
+      for (String s : mapq) {
+        Collections.addAll(ret, s.split(","));
+      }
+      return ret;
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+        throws Exception {
+      if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
+        LOG.info(String.format("Current number of shuffle connections (%d) is " + 
+            "greater than or equal to the max allowed shuffle connections (%d)", 
+            accepted.size(), maxShuffleConnections));
+        evt.getChannel().close();
+        return;
+      }
+      accepted.add(evt.getChannel());
+      super.channelOpen(ctx, evt);
+     
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+        throws Exception {
+      HttpRequest request = (HttpRequest) evt.getMessage();
+      if (request.getMethod() != GET) {
+          sendError(ctx, METHOD_NOT_ALLOWED);
+          return;
+      }
+      // Check whether the shuffle version is compatible
+      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
+          request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
+              request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+        sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+      }
+      final Map<String,List<String>> q =
+        new QueryStringDecoder(request.getUri()).getParameters();
+      final List<String> keepAliveList = q.get("keepAlive");
+      boolean keepAliveParam = false;
+      if (keepAliveList != null && keepAliveList.size() == 1) {
+        keepAliveParam = Boolean.valueOf(keepAliveList.get(0));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("KeepAliveParam : " + keepAliveList
+            + " : " + keepAliveParam);
+        }
+      }
+      final List<String> mapIds = splitMaps(q.get("map"));
+      final List<String> reduceQ = q.get("reduce");
+      final List<String> jobQ = q.get("job");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RECV: " + request.getUri() +
+            "\n  mapId: " + mapIds +
+            "\n  reduceId: " + reduceQ +
+            "\n  jobId: " + jobQ +
+            "\n  keepAlive: " + keepAliveParam);
+      }
+
+      if (mapIds == null || reduceQ == null || jobQ == null) {
+        sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+        return;
+      }
+      if (reduceQ.size() != 1 || jobQ.size() != 1) {
+        sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+        return;
+      }
+      int reduceId;
+      String jobId;
+      try {
+        reduceId = Integer.parseInt(reduceQ.get(0));
+        jobId = jobQ.get(0);
+      } catch (NumberFormatException e) {
+        sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+        return;
+      } catch (IllegalArgumentException e) {
+        sendError(ctx, "Bad job parameter", BAD_REQUEST);
+        return;
+      }
+      final String reqUri = request.getUri();
+      if (null == reqUri) {
+        // TODO? add upstream?
+        sendError(ctx, FORBIDDEN);
+        return;
+      }
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      try {
+        verifyRequest(jobId, ctx, request, response,
+            new URL("http", "", this.port, reqUri));
+      } catch (IOException e) {
+        LOG.warn("Shuffle failure ", e);
+        sendError(ctx, e.getMessage(), UNAUTHORIZED);
+        return;
+      }
+
+      Map<String, MapOutputInfo> mapOutputInfoMap =
+          new HashMap<String, MapOutputInfo>();
+      Channel ch = evt.getChannel();
+      String user = userRsrc.get(jobId);
+
+      // $x/$user/appcache/$appId/output/$mapId
+      // TODO: Once Shuffle is out of NM, this can use MR APIs to convert
+      // between App and Job
+      String outputBasePathStr = getBaseLocation(jobId, user);
+
+      try {
+        populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
+          response, keepAliveParam, mapOutputInfoMap);
+      } catch(IOException e) {
+        ch.write(response);
+        LOG.error("Shuffle error in populating headers :", e);
+        String errorMessage = getErrorMessage(e);
+        sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+        return;
+      }
+      ch.write(response);
+      // TODO refactor the following into the pipeline
+      ChannelFuture lastMap = null;
+      for (String mapId : mapIds) {
+        try {
+          MapOutputInfo info = mapOutputInfoMap.get(mapId);
+          if (info == null) {
+            info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+          }
+          lastMap =
+              sendMapOutput(ctx, ch, user, mapId,
+                reduceId, info);
+          if (null == lastMap) {
+            sendError(ctx, NOT_FOUND);
+            return;
+          }
+        } catch (IOException e) {
+          LOG.error("Shuffle error :", e);
+          String errorMessage = getErrorMessage(e);
+          sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+          return;
+        }
+      }
+      lastMap.addListener(ChannelFutureListener.CLOSE);
+    }
+
+    private String getErrorMessage(Throwable t) {
+      StringBuffer sb = new StringBuffer(t.getMessage());
+      while (t.getCause() != null) {
+        sb.append(t.getCause().getMessage());
+        t = t.getCause();
+      }
+      return sb.toString();
+    }
+
+    private final String USERCACHE_CONSTANT = "usercache";
+    private final String APPCACHE_CONSTANT = "appcache";
+
+    private String getBaseLocation(String jobIdString, String user) {
+      String parts[] = jobIdString.split("_");
+      Preconditions.checkArgument(parts.length == 3, "Invalid jobId. Expecting 3 parts");
+      final ApplicationId appID =
+          ApplicationId.newInstance(Long.parseLong(parts[1]), Integer.parseInt(parts[2]));
+      final String baseStr =
+          USERCACHE_CONSTANT + "/" + user + "/"
+              + APPCACHE_CONSTANT + "/"
+              + ConverterUtils.toString(appID) + "/output" + "/";
+      return baseStr;
+    }
+
+    protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+        int reduce, String user) throws IOException {
+      // Index file
+      Path indexFileName =
+          lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+      TezIndexRecord info =
+          indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+
+      Path mapOutputFileName =
+          lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
+      }
+      MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
+      return outputInfo;
+    }
+
+    protected void populateHeaders(List<String> mapIds, String outputBaseStr,
+        String user, int reduce, HttpRequest request, HttpResponse response,
+        boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
+        throws IOException {
+
+      long contentLength = 0;
+      for (String mapId : mapIds) {
+        String base = outputBaseStr + mapId;
+        MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
+        if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
+          mapOutputInfoMap.put(mapId, outputInfo);
+        }
+        // Index file
+        Path indexFileName =
+            lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+        TezIndexRecord info =
+            indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+        ShuffleHeader header =
+            new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
+        DataOutputBuffer dob = new DataOutputBuffer();
+        header.write(dob);
+
+        contentLength += info.getPartLength();
+        contentLength += dob.getLength();
+      }
+
+      // Now set the response headers.
+      setResponseHeaders(response, keepAliveParam, contentLength);
+    }
+
+    protected void setResponseHeaders(HttpResponse response,
+        boolean keepAliveParam, long contentLength) {
+      if (!connectionKeepAliveEnabled && !keepAliveParam) {
+        LOG.info("Setting connection close header...");
+        response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
+      } else {
+        response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
+          String.valueOf(contentLength));
+        response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+        response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
+            + connectionKeepAliveTimeOut);
+        LOG.info("Content Length in shuffle : " + contentLength);
+      }
+    }
+
+    class MapOutputInfo {
+      final Path mapOutputFileName;
+      final TezIndexRecord indexRecord;
+
+      MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord) {
+        this.mapOutputFileName = mapOutputFileName;
+        this.indexRecord = indexRecord;
+      }
+    }
+
+    protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+        HttpRequest request, HttpResponse response, URL requestUri)
+        throws IOException {
+      SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
+      if (null == tokenSecret) {
+        LOG.info("Request for unknown token " + appid);
+        throw new IOException("could not find jobid");
+      }
+      // string to encrypt
+      String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
+      // hash from the fetcher
+      String urlHashStr =
+        request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+      if (urlHashStr == null) {
+        LOG.info("Missing header hash for " + appid);
+        throw new IOException("fetcher cannot be authenticated");
+      }
+      if (LOG.isDebugEnabled()) {
+        int len = urlHashStr.length();
+        LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
+            urlHashStr.substring(len-len/2, len-1));
+      }
+      // verify - throws exception
+      SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
+      // verification passed - encode the reply
+      String reply =
+        SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), 
+            tokenSecret);
+      response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      // Put shuffle version into http header
+      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      if (LOG.isDebugEnabled()) {
+        int len = reply.length();
+        LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
+            reply.substring(len-len/2, len-1));
+      }
+    }
+
+    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
+        String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
+        throws IOException {
+      final TezIndexRecord info = mapOutputInfo.indexRecord;
+      final ShuffleHeader header =
+        new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
+      final DataOutputBuffer dob = new DataOutputBuffer();
+      header.write(dob);
+      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+      final File spillfile =
+          new File(mapOutputInfo.mapOutputFileName.toString());
+      RandomAccessFile spill;
+      try {
+        spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
+      } catch (FileNotFoundException e) {
+        LOG.info(spillfile + " not found");
+        return null;
+      }
+      ChannelFuture writeFuture;
+      if (ch.getPipeline().get(SslHandler.class) == null) {
+        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+            info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength,
+            readaheadPool, spillfile.getAbsolutePath(), 
+            shuffleBufferSize, shuffleTransferToAllowed);
+        writeFuture = ch.write(partition);
+        writeFuture.addListener(new ChannelFutureListener() {
+            // TODO error handling; distinguish IO/connection failures,
+            //      attribute to appropriate spill output
+          @Override
+          public void operationComplete(ChannelFuture future) {
+            if (future.isSuccess()) {
+              partition.transferSuccessful();
+            }
+            partition.releaseExternalResources();
+          }
+        });
+      } else {
+        // HTTPS cannot be done with zero copy.
+        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+            info.getStartOffset(), info.getPartLength(), sslFileBufferSize,
+            manageOsCache, readaheadLength, readaheadPool,
+            spillfile.getAbsolutePath());
+        writeFuture = ch.write(chunk);
+      }
+      return writeFuture;
+    }
+
+    protected void sendError(ChannelHandlerContext ctx,
+        HttpResponseStatus status) {
+      sendError(ctx, "", status);
+    }
+
+    protected void sendError(ChannelHandlerContext ctx, String message,
+        HttpResponseStatus status) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      // Put shuffle version into http header
+      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      response.setContent(
+        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+      // Close the connection as soon as the error message is sent.
+      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      Channel ch = e.getChannel();
+      Throwable cause = e.getCause();
+      if (cause instanceof TooLongFrameException) {
+        sendError(ctx, BAD_REQUEST);
+        return;
+      } else if (cause instanceof IOException) {
+        if (cause instanceof ClosedChannelException) {
+          LOG.debug("Ignoring closed channel error", cause);
+          return;
+        }
+        String message = String.valueOf(cause.getMessage());
+        if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
+          LOG.debug("Ignoring client socket close", cause);
+          return;
+        }
+      }
+
+      LOG.error("Shuffle error: ", cause);
+      if (ch.isConnected()) {
+        LOG.error("Shuffle error " + e);
+        sendError(ctx, INTERNAL_SERVER_ERROR);
+      }
+    }
+  }
+}