You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:02 UTC

[24/43] tez git commit: TEZ-2090. Add tests for jobs running in external services. (sseth)

TEZ-2090. Add tests for jobs running in external services. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aadd0492
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aadd0492
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aadd0492

Branch: refs/heads/TEZ-2003
Commit: aadd04927b0cc646db2579f17f903d8e24916bdc
Parents: 7b71d3b
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 13 17:24:05 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:01 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 pom.xml                                         |   6 +
 .../apache/tez/dag/api/TezConfiguration.java    |   2 +
 .../apache/tez/dag/api/TaskCommunicator.java    |   1 +
 .../tez/dag/api/TaskCommunicatorContext.java    |   3 +
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  42 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   2 +-
 tez-ext-service-tests/pom.xml                   | 161 ++++
 .../tez/dag/app/TezTestServiceCommunicator.java | 152 ++++
 .../TezTestServiceContainerLauncher.java        | 144 ++++
 .../TezTestServiceNoOpContainerLauncher.java    |  66 ++
 .../rm/TezTestServiceTaskSchedulerService.java  | 347 ++++++++
 .../TezTestServiceTaskCommunicatorImpl.java     | 182 ++++
 .../org/apache/tez/service/ContainerRunner.java |  27 +
 .../tez/service/MiniTezTestServiceCluster.java  | 163 ++++
 .../service/TezTestServiceConfConstants.java    |  41 +
 .../TezTestServiceProtocolBlockingPB.java       |  22 +
 .../tez/service/impl/ContainerRunnerImpl.java   | 512 +++++++++++
 .../apache/tez/service/impl/TezTestService.java | 126 +++
 .../impl/TezTestServiceProtocolClientImpl.java  |  82 ++
 .../impl/TezTestServiceProtocolServerImpl.java  | 133 +++
 .../tez/shufflehandler/FadvisedChunkedFile.java |  78 ++
 .../tez/shufflehandler/FadvisedFileRegion.java  | 160 ++++
 .../apache/tez/shufflehandler/IndexCache.java   | 199 +++++
 .../tez/shufflehandler/ShuffleHandler.java      | 840 +++++++++++++++++++
 .../tez/tests/TestExternalTezServices.java      | 183 ++++
 .../org/apache/tez/util/ProtoConverters.java    | 172 ++++
 .../src/test/proto/TezDaemonProtocol.proto      |  84 ++
 .../src/test/resources/log4j.properties         |  19 +
 .../org/apache/tez/runtime/task/TezChild.java   |   2 +-
 .../apache/tez/runtime/task/TezTaskRunner.java  |   2 +-
 31 files changed, 3943 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d7e4be5..975ce65 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -1,5 +1,6 @@
 ALL CHANGES:
   TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
   TEZ-2006. Task communication plane needs to be pluggable.
+  TEZ-2090. Add tests for jobs running in external services.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ce4fa13..ca9db11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,11 @@
         <type>test-jar</type>
       </dependency>
       <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-ext-service-tests</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.pig</groupId>
         <artifactId>pig</artifactId>
         <version>${pig.version}</version>
@@ -638,6 +643,7 @@
     <module>tez-ui</module>
     <module>tez-plugins</module>
     <module>tez-tools</module>
+    <module>tez-ext-service-tests</module>
     <module>tez-dist</module>
     <module>docs</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 01e724e..1cd478e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1172,6 +1172,8 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
   @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+  @ConfigurationScope(Scope.VERTEX)
+  public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class";
 
 
   // TODO only validate property here, value can also be validated if necessary

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 97f9c16..c9f85e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.api;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 9b2d889..41675fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -44,5 +44,8 @@ public interface TaskCommunicatorContext {
   // TODO TEZ-2003 Move to vertex, taskIndex, version
   void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
 
+  // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure.
+  // This will have to take into consideration the TA_FAILED event
+
   // TODO Eventually Add methods to report availability stats to the scheduler.
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 5652937..258c927 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -74,16 +74,22 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
       new ConcurrentHashMap<TaskAttempt, ContainerId>();
 
   private final TezTaskUmbilicalProtocol taskUmbilical;
+  private final String tokenIdentifier;
+  private final Token<JobTokenIdentifier> sessionToken;
   private InetSocketAddress address;
   private Server server;
 
-  private static final class ContainerInfo {
+  public static final class ContainerInfo {
 
-    ContainerInfo(ContainerId containerId) {
+    ContainerInfo(ContainerId containerId, String host, int port) {
       this.containerId = containerId;
+      this.host = host;
+      this.port = port;
     }
 
-    ContainerId containerId;
+    final ContainerId containerId;
+    public final String host;
+    public final int port;
     TezHeartbeatResponse lastResponse = null;
     TaskSpec taskSpec = null;
     long lastRequestId = 0;
@@ -110,6 +116,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     super(TezTaskCommunicatorImpl.class.getName());
     this.taskCommunicatorContext = taskCommunicatorContext;
     this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
+    this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
+    this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
   }
 
 
@@ -130,9 +138,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
       try {
         JobTokenSecretManager jobTokenSecretManager =
             new JobTokenSecretManager();
-        Token<JobTokenIdentifier> sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
-        jobTokenSecretManager.addTokenForJob(
-            taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(), sessionToken);
+        jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
 
         server = new RPC.Builder(conf)
             .setProtocol(TezTaskUmbilicalProtocol.class)
@@ -182,7 +188,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
   @Override
   public void registerRunningContainer(ContainerId containerId, String host, int port) {
-    ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId));
+    ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port));
     if (oldInfo != null) {
       throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
     }
@@ -230,9 +236,9 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
                 ". Already registered to containerId: " + oldId);
       }
     }
-
   }
 
+
   @Override
   public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
     TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
@@ -258,6 +264,18 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     return address;
   }
 
+  protected String getTokenIdentifier() {
+    return tokenIdentifier;
+  }
+
+  protected Token<JobTokenIdentifier> getSessionToken() {
+    return sessionToken;
+  }
+
+  protected TaskCommunicatorContext getTaskCommunicatorContext() {
+    return taskCommunicatorContext;
+  }
+
   public TezTaskUmbilicalProtocol getUmbilical() {
     return this.taskUmbilical;
   }
@@ -471,4 +489,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
       return "TaskAttempt{" + "taskAttemptId=" + taskAttemptId + '}';
     }
   }
+
+  protected ContainerInfo getContainerInfo(ContainerId containerId) {
+    return registeredContainers.get(containerId);
+  }
+
+  protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) {
+    return attemptToContainerMap.get(new TaskAttempt(taskAttemptId));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 62f82db..8c3ed87 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -350,7 +350,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         try {
           Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
               .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
-                  Integer.class, String.class, Configuration.class);
+                  int.class, String.class, Configuration.class);
           ctor.setAccessible(true);
           TaskSchedulerService taskSchedulerService =
               ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
new file mode 100644
index 0000000..37f68b1
--- /dev/null
+++ b/tez-ext-service-tests/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>tez</artifactId>
+    <groupId>org.apache.tez</groupId>
+    <version>0.7.0-SNAPSHOT</version>
+  </parent>
+
+  <!-- TODO TEZ-2003 Merge this into the tez-tests module -->
+  <artifactId>tez-ext-service-tests</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+    </dependency>
+    <dependency>
+      <!-- Required for the ShuffleHandler -->
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <!--
+     Include all files in src/main/resources.  By default, do not apply property
+     substitution (filtering=false), but do apply property substitution to
+     version-info.properties (filtering=true).  This will substitute the
+     version information correctly, but prevent Maven from altering other files.
+     -->
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <excludes>
+          <exclude>tez-api-version-info.properties</exclude>
+        </excludes>
+        <filtering>false</filtering>
+      </resource>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <includes>
+          <include>tez-api-version-info.properties</include>
+        </includes>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/test/proto</param>
+                <param>${basedir}/../tez-api/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/test/proto</directory>
+                <includes>
+                  <include>TezDaemonProtocol.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-test-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
new file mode 100644
index 0000000..ac50878
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.service.impl.TezTestServiceProtocolClientImpl;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+
+public class TezTestServiceCommunicator extends AbstractService {
+
+  private final ConcurrentMap<String, TezTestServiceProtocolBlockingPB> hostProxies;
+  private final ListeningExecutorService executor;
+
+  // TODO Convert this into a singleton
+  public TezTestServiceCommunicator(int numThreads) {
+    super(TezTestServiceCommunicator.class.getSimpleName());
+    ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+        new ThreadFactoryBuilder().setNameFormat("TezTestServiceCommunicator #%2d").build());
+    this.hostProxies = new ConcurrentHashMap<String, TezTestServiceProtocolBlockingPB>();
+    executor = MoreExecutors.listeningDecorator(localExecutor);
+  }
+
+  @Override
+  public void serviceStop() {
+    executor.shutdownNow();
+  }
+
+
+  public void runContainer(RunContainerRequestProto request, String host, int port,
+                           final ExecuteRequestCallback<RunContainerResponseProto> callback) {
+    ListenableFuture<RunContainerResponseProto> future = executor.submit(new RunContainerCallable(request, host, port));
+    Futures.addCallback(future, new FutureCallback<RunContainerResponseProto>() {
+      @Override
+      public void onSuccess(RunContainerResponseProto result) {
+        callback.setResponse(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        callback.indicateError(t);
+      }
+    });
+
+  }
+
+  public void submitWork(SubmitWorkRequestProto request, String host, int port,
+                         final ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
+    ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(request, host, port));
+    Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() {
+      @Override
+      public void onSuccess(SubmitWorkResponseProto result) {
+        callback.setResponse(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        callback.indicateError(t);
+      }
+    });
+
+  }
+
+
+  private class RunContainerCallable implements Callable<RunContainerResponseProto> {
+
+    final String hostname;
+    final int port;
+    final RunContainerRequestProto request;
+
+    private RunContainerCallable(RunContainerRequestProto request, String hostname, int port) {
+      this.hostname = hostname;
+          this.port = port;
+      this.request = request;
+    }
+
+    @Override
+    public RunContainerResponseProto call() throws Exception {
+      return getProxy(hostname, port).runContainer(null, request);
+    }
+  }
+
+  private class SubmitWorkCallable implements Callable<SubmitWorkResponseProto> {
+    final String hostname;
+    final int port;
+    final SubmitWorkRequestProto request;
+
+    private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) {
+      this.hostname = hostname;
+      this.port = port;
+      this.request = request;
+    }
+
+    @Override
+    public SubmitWorkResponseProto call() throws Exception {
+      return getProxy(hostname, port).submitWork(null, request);
+    }
+  }
+
+  public interface ExecuteRequestCallback<T extends Message> {
+    void setResponse(T response);
+    void indicateError(Throwable t);
+  }
+
+  private TezTestServiceProtocolBlockingPB getProxy(String hostname, int port) {
+    String hostId = getHostIdentifier(hostname, port);
+
+    TezTestServiceProtocolBlockingPB proxy = hostProxies.get(hostId);
+    if (proxy == null) {
+      proxy = new TezTestServiceProtocolClientImpl(getConfig(), hostname, port);
+      TezTestServiceProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
+      if (proxyOld != null) {
+        // TODO Shutdown the new proxy.
+        proxy = proxyOld;
+      }
+    }
+    return proxy;
+  }
+
+  private String getHostIdentifier(String hostname, int port) {
+    return hostname + ":" + port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
new file mode 100644
index 0000000..e83165b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.service.TezTestServiceConfConstants;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+
+public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher {
+
+  // TODO Support interruptability of tasks which haven't yet been launched.
+
+  // TODO May need multiple connections per target machine, depending upon how synchronization is handled in the RPC layer
+
+  static final Log LOG = LogFactory.getLog(TezTestServiceContainerLauncher.class);
+
+  private final AppContext context;
+  private final String tokenIdentifier;
+  private final TaskAttemptListener tal;
+  private final int servicePort;
+  private final TezTestServiceCommunicator communicator;
+  private final Clock clock;
+
+
+  // Configuration passed in here to set up final parameters
+  public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf,
+                                         TaskAttemptListener tal) {
+    super(TezTestServiceContainerLauncher.class.getName());
+    this.clock = appContext.getClock();
+    int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
+
+    this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+    Preconditions.checkArgument(servicePort > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
+    this.communicator = new TezTestServiceCommunicator(numThreads);
+    this.context = appContext;
+    this.tokenIdentifier = context.getApplicationID().toString();
+    this.tal = tal;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    communicator.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    communicator.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    communicator.stop();
+  }
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    switch (event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+        RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent);
+        communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(),
+            launchEvent.getNodeId().getPort(),
+            new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
+              @Override
+              public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
+                LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId());
+                context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+                ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+                    launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+                context.getHistoryHandler().handle(new DAGHistoryEvent(
+                    null, lEvt));
+              }
+
+              @Override
+              public void indicateError(Throwable t) {
+                LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t);
+                sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t);
+              }
+            });
+        break;
+      case CONTAINER_STOP_REQUEST:
+        LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+        // that the container is actually done (normally received from RM)
+        // TODO Sending this out for an un-launched container is invalid
+        context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+            AMContainerEventType.C_NM_STOP_SENT));
+        break;
+    }
+  }
+
+  private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
+    RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
+    builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort());
+    builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId());
+    builder.setApplicationIdString(
+        event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString());
+    builder.setTokenIdentifier(tokenIdentifier);
+    builder.setContainerIdString(event.getContainer().getId().toString());
+    builder.setCredentialsBinary(
+        ByteString.copyFrom(event.getContainerLaunchContext().getTokens()));
+    // TODO Avoid reading this from the environment
+    builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    return builder.build();
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) {
+    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
new file mode 100644
index 0000000..8c8e486
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+
+public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher {
+
+  static final Log LOG = LogFactory.getLog(TezTestServiceNoOpContainerLauncher.class);
+
+  private final AppContext context;
+  private final Clock clock;
+
+  public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf,
+                                         TaskAttemptListener tal) {
+    super(TezTestServiceNoOpContainerLauncher.class.getName());
+    this.context = appContext;
+    this.clock = appContext.getClock();
+  }
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    switch(event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+        LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId());
+        context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+        ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+            launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+        context.getHistoryHandler().handle(new DAGHistoryEvent(
+            null, lEvt));
+        break;
+      case CONTAINER_STOP_REQUEST:
+        LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+        context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+            AMContainerEventType.C_NM_STOP_SENT));
+        break;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
new file mode 100644
index 0000000..e3c18bf
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.service.TezTestServiceConfConstants;
+
+
+// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes.
+
+public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
+
+  private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class);
+
+  private final ExecutorService appCallbackExecutor;
+  private final TaskSchedulerAppCallback appClientDelegate;
+  private final AppContext appContext;
+  private final List<String> serviceHosts;
+  private final ContainerFactory containerFactory;
+  private final Random random = new Random();
+  // Currently all services must be running on the same port.
+  private final int containerPort;
+
+  private final String clientHostname;
+  private final int clientPort;
+  private final String trackingUrl;
+  private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  private final ConcurrentMap<Object, ContainerId> runningTasks =
+      new ConcurrentHashMap<Object, ContainerId>();
+
+  private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
+
+  // Per instance
+  private final int memoryPerInstance;
+  private final int coresPerInstance;
+  private final int executorsPerInstance;
+
+  // Per Executor Thread
+  private final Resource resourcePerContainer;
+
+
+  public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
+                                            AppContext appContext,
+                                            String clientHostname, int clientPort,
+                                            String trackingUrl,
+                                            Configuration conf) {
+    // Accepting configuration here to allow setting up fields as final
+    super(TezTestServiceTaskSchedulerService.class.getName());
+    this.appCallbackExecutor = createAppCallbackExecutorService();
+    this.appClientDelegate = createAppCallbackDelegate(appClient);
+    this.appContext = appContext;
+    this.serviceHosts = new LinkedList<String>();
+    this.containerFactory = new ContainerFactory(appContext);
+
+    this.memoryPerInstance = conf
+        .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
+    Preconditions.checkArgument(memoryPerInstance > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB +
+            " must be configured");
+
+    this.executorsPerInstance = conf.getInt(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+        -1);
+    Preconditions.checkArgument(executorsPerInstance > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE +
+            " must be configured");
+
+    this.coresPerInstance = conf
+        .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE,
+            executorsPerInstance);
+
+    this.containerPort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+    Preconditions.checkArgument(executorsPerInstance > 0,
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured");
+
+    this.clientHostname = clientHostname;
+    this.clientPort = clientPort;
+    this.trackingUrl = trackingUrl;
+
+    int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
+    int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
+    this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
+    this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler());
+
+    String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS);
+    if (hosts == null || hosts.length == 0) {
+      hosts = new String[]{"localhost"};
+    }
+    for (String host : hosts) {
+      serviceHosts.add(host);
+    }
+
+    LOG.info("Running with configuration: " +
+        "memoryPerInstance=" + memoryPerInstance +
+        ", vcoresPerInstance=" + coresPerInstance +
+        ", executorsPerInstance=" + executorsPerInstance +
+        ", resourcePerContainerInferred=" + resourcePerContainer +
+        ", hosts=" + serviceHosts.toString());
+
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    amRmClient.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    amRmClient.start();
+    RegisterApplicationMasterResponse response;
+    try {
+      amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl);
+    } catch (YarnException e) {
+      throw new TezUncheckedException(e);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @Override
+  public void serviceStop() {
+    if (!this.isStopped.getAndSet(true)) {
+
+      try {
+        TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+        amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage,
+            status.postCompletionTrackingUrl);
+      } catch (YarnException e) {
+        throw new TezUncheckedException(e);
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+      appCallbackExecutor.shutdownNow();
+    }
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    // TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
+    return Resource
+        .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+            serviceHosts.size() * coresPerInstance);
+  }
+
+  @Override
+  public int getClusterNodeCount() {
+    return serviceHosts.size();
+  }
+
+  @Override
+  public void resetMatchLocalityForAllHeldContainers() {
+  }
+
+  @Override
+  public Resource getTotalResources() {
+    return Resource
+        .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+            serviceHosts.size() * coresPerInstance);
+  }
+
+  @Override
+  public void blacklistNode(NodeId nodeId) {
+    LOG.info("DEBUG: BlacklistNode not supported");
+  }
+
+  @Override
+  public void unblacklistNode(NodeId nodeId) {
+    LOG.info("DEBUG: unBlacklistNode not supported");
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+                           Priority priority, Object containerSignature, Object clientCookie) {
+    String host = selectHost(hosts);
+    Container container =
+        containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+    runningTasks.put(task, container.getId());
+    appClientDelegate.taskAllocated(task, clientCookie, container);
+  }
+
+
+  @Override
+  public void allocateTask(Object task, Resource capability, ContainerId containerId,
+                           Priority priority, Object containerSignature, Object clientCookie) {
+    String host = selectHost(null);
+    Container container =
+        containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+    runningTasks.put(task, container.getId());
+    appClientDelegate.taskAllocated(task, clientCookie, container);
+  }
+
+  @Override
+  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+    ContainerId containerId = runningTasks.remove(task);
+    if (containerId == null) {
+      LOG.error("Could not determine ContainerId for task: " + task +
+          " . Could have hit a race condition. Ignoring." +
+          " The query may hang since this \"unknown\" container is now taking up a slot permanently");
+      return false;
+    }
+    appClientDelegate.containerBeingReleased(containerId);
+    return true;
+  }
+
+  @Override
+  public Object deallocateContainer(ContainerId containerId) {
+    LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId);
+    return null;
+  }
+
+  @Override
+  public void setShouldUnregister() {
+
+  }
+
+  @Override
+  public boolean hasUnregistered() {
+    // Nothing to do. No registration involved.
+    return true;
+  }
+
+  private ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+  }
+
+  private TaskSchedulerAppCallback createAppCallbackDelegate(
+      TaskSchedulerAppCallback realAppClient) {
+    return new TaskSchedulerAppCallbackWrapper(realAppClient,
+        appCallbackExecutor);
+  }
+
+  private String selectHost(String[] requestedHosts) {
+    String host = null;
+    if (requestedHosts != null && requestedHosts.length > 0) {
+      Arrays.sort(requestedHosts);
+      host = requestedHosts[0];
+      LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts));
+    } else {
+      host = serviceHosts.get(random.nextInt(serviceHosts.size()));
+      LOG.info("Selected random host: " + host + " since the request contained no host information");
+    }
+    return host;
+  }
+
+  static class ContainerFactory {
+    final AppContext appContext;
+    AtomicInteger nextId;
+
+    public ContainerFactory(AppContext appContext) {
+      this.appContext = appContext;
+      this.nextId = new AtomicInteger(2);
+    }
+
+    public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
+      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+      NodeId nodeId = NodeId.newInstance(hostname, port);
+      String nodeHttpAddress = "hostname:0";
+
+      Container container = Container.newInstance(containerId,
+          nodeId,
+          nodeHttpAddress,
+          capability,
+          priority,
+          null);
+
+      return container;
+    }
+  }
+
+  private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler {
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+
+    }
+
+    @Override
+    public void onShutdownRequest() {
+
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {
+
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public void onError(Throwable e) {
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
new file mode 100644
index 0000000..78cdcde
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.taskcomm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+import org.apache.tez.util.ProtoConverters;
+
+
+public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl {
+
+  private static final Log LOG = LogFactory.getLog(TezTestServiceTaskCommunicatorImpl.class);
+
+  private final TezTestServiceCommunicator communicator;
+  private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
+  private final ConcurrentMap<String, ByteBuffer> credentialMap;
+
+  public TezTestServiceTaskCommunicatorImpl(
+      TaskCommunicatorContext taskCommunicatorContext) {
+    super(taskCommunicatorContext);
+    // TODO Maybe make this configurable
+    this.communicator = new TezTestServiceCommunicator(3);
+
+    SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
+
+    // TODO Avoid reading this from the environment
+    baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    baseBuilder.setApplicationIdString(
+        taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
+    baseBuilder
+        .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
+    baseBuilder.setTokenIdentifier(getTokenIdentifier());
+
+    BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
+
+    credentialMap = new ConcurrentHashMap<String, ByteBuffer>();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    this.communicator.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    super.serviceStart();
+    this.communicator.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    super.serviceStop();
+  }
+
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
+    super.registerRunningContainer(containerId, hostname, port);
+  }
+
+  @Override
+  public void registerContainerEnd(ContainerId containerId) {
+    super.registerContainerEnd(containerId);
+  }
+
+  @Override
+  public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec,
+                                         Map<String, LocalResource> additionalResources,
+                                         Credentials credentials,
+                                         boolean credentialsChanged)  {
+    super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
+        credentialsChanged);
+    SubmitWorkRequestProto requestProto = null;
+    try {
+      requestProto = constructSubmitWorkRequest(containerId, taskSpec);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to construct request", e);
+    }
+    ContainerInfo containerInfo = getContainerInfo(containerId);
+    String host;
+    int port;
+    if (containerInfo != null) {
+      synchronized (containerInfo) {
+        host = containerInfo.host;
+        port = containerInfo.port;
+      }
+    } else {
+      // TODO Handle this properly
+      throw new RuntimeException("ContainerInfo not found for container: " + containerId +
+          ", while trying to launch task: " + taskSpec.getTaskAttemptID());
+    }
+    communicator.submitWork(requestProto, host, port,
+        new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
+          @Override
+          public void setResponse(SubmitWorkResponseProto response) {
+            LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
+            getTaskCommunicatorContext()
+                .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in.
+            LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+                containerId, t);
+          }
+        });
+  }
+
+  @Override
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+    super.unregisterRunningTaskAttempt(taskAttemptID);
+    // Nothing else to do for now. The push API in the test does not support termination of a running task
+  }
+
+  private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId,
+                                                            TaskSpec taskSpec) throws
+      IOException {
+    SubmitWorkRequestProto.Builder builder =
+        SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST);
+    builder.setContainerIdString(containerId.toString());
+    builder.setAmHost(getAddress().getHostName());
+    builder.setAmPort(getAddress().getPort());
+    Credentials taskCredentials = new Credentials();
+    // Credentials can change across DAGs. Ideally construct only once per DAG.
+    taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+
+    ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
+    if (credentialsBinary == null) {
+      credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+      credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
+    } else {
+      credentialsBinary = credentialsBinary.duplicate();
+    }
+    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+    builder.setTaskSpec(ProtoConverters.convertTaskSpecToProto(taskSpec));
+    return builder.build();
+  }
+
+  private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+    Credentials containerCredentials = new Credentials();
+    containerCredentials.addAll(credentials);
+    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+    ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+        containerTokens_dob.getLength());
+    return containerCredentialsBuffer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
new file mode 100644
index 0000000..2bca4ed
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.IOException;
+
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+
+public interface ContainerRunner {
+
+  void queueContainer(RunContainerRequestProto request) throws IOException;
+  void submitWork(SubmitWorkRequestProto request) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
new file mode 100644
index 0000000..f47bd67
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.tez.service.impl.TezTestService;
+
+public class MiniTezTestServiceCluster extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(MiniTezTestServiceCluster.class);
+
+  private final File testWorkDir;
+  private final long availableMemory;
+  private final int numExecutorsPerService;
+  private final String[] localDirs;
+  private final Configuration clusterSpecificConfiguration = new Configuration(false);
+
+  private TezTestService tezTestService;
+
+  public static MiniTezTestServiceCluster create(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+    return new MiniTezTestServiceCluster(clusterName, numExecutorsPerService, availableMemory, numLocalDirs);
+  }
+
+  // TODO Add support for multiple instances
+  private MiniTezTestServiceCluster(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+    super(clusterName + "_TezTestServerCluster");
+    Preconditions.checkArgument(numExecutorsPerService > 0);
+    Preconditions.checkArgument(availableMemory > 0);
+    Preconditions.checkArgument(numLocalDirs > 0);
+    String clusterNameTrimmed = clusterName.replace("$", "") + "_TezTestServerCluster";
+    File targetWorkDir = new File("target", clusterNameTrimmed);
+    try {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(targetWorkDir.getAbsolutePath()), true);
+    } catch (Exception e) {
+      LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e);
+      throw new RuntimeException("Could not cleanup test workDir: " + targetWorkDir, e);
+    }
+
+    if (Shell.WINDOWS) {
+      // The test working directory can exceed the maximum path length supported
+      // by some Windows APIs and cmd.exe (260 characters).  To work around this,
+      // create a symlink in temporary storage with a much shorter path,
+      // targeting the full path to the test working directory.  Then, use the
+      // symlink as the test working directory.
+      String targetPath = targetWorkDir.getAbsolutePath();
+      File link = new File(System.getProperty("java.io.tmpdir"),
+          String.valueOf(System.currentTimeMillis()));
+      String linkPath = link.getAbsolutePath();
+
+      try {
+        FileContext.getLocalFSFileContext().delete(new Path(linkPath), true);
+      } catch (IOException e) {
+        throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e);
+      }
+
+      // Guarantee target exists before creating symlink.
+      targetWorkDir.mkdirs();
+
+      Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+          Shell.getSymlinkCommand(targetPath, linkPath));
+      try {
+        shexec.execute();
+      } catch (IOException e) {
+        throw new YarnRuntimeException(String.format(
+            "failed to create symlink from %s to %s, shell output: %s", linkPath,
+            targetPath, shexec.getOutput()), e);
+      }
+
+      this.testWorkDir = link;
+    } else {
+      this.testWorkDir = targetWorkDir;
+    }
+    this.numExecutorsPerService = numExecutorsPerService;
+    this.availableMemory = availableMemory;
+
+    // Setup Local Dirs
+    localDirs = new String[numLocalDirs];
+    for (int i = 0 ; i < numLocalDirs ; i++) {
+      File f = new File(testWorkDir, "localDir");
+      f.mkdirs();
+      LOG.info("Created localDir: " + f.getAbsolutePath());
+      localDirs[i] = f.getAbsolutePath();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    tezTestService = new TezTestService(conf, numExecutorsPerService, availableMemory, localDirs);
+    tezTestService.init(conf);
+
+  }
+
+  @Override
+  public void serviceStart() {
+    tezTestService.start();
+
+    clusterSpecificConfiguration.set(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS,
+        getServiceAddress().getHostName());
+    clusterSpecificConfiguration.setInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT,
+        getServiceAddress().getPort());
+
+    clusterSpecificConfiguration.setInt(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+        numExecutorsPerService);
+    clusterSpecificConfiguration.setLong(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, availableMemory);
+  }
+
+  @Override
+  public void serviceStop() {
+    tezTestService.stop();
+  }
+
+  /**
+   * return the address at which the service is listening
+   * @return host:port
+   */
+  public InetSocketAddress getServiceAddress() {
+    Preconditions.checkState(getServiceState() == STATE.STARTED);
+    return tezTestService.getListenerAddress();
+  }
+
+  public int getShufflePort() {
+    Preconditions.checkState(getServiceState() == STATE.STARTED);
+    return tezTestService.getShufflePort();
+  }
+
+  public Configuration getClusterSpecificConfiguration() {
+    Preconditions.checkState(getServiceState() == STATE.STARTED);
+    return clusterSpecificConfiguration;
+  }
+
+  // Mainly for verification
+  public int getNumSubmissions() {
+    return tezTestService.getNumSubmissions();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
new file mode 100644
index 0000000..bf4a5bd
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+public class TezTestServiceConfConstants {
+
+  private static final String TEZ_TEST_SERVICE_PREFIX = "tez.test.service.";
+
+  /** Number of executors per instance - used by the scheduler */
+  public static final String TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "num.executors.per-instance";
+
+  /** Memory available per instance - used by the scheduler */
+  public static final String TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB = TEZ_TEST_SERVICE_PREFIX + "memory.per.instance.mb";
+
+  /** CPUs available per instance - used by the scheduler */
+  public static final String TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "vcpus.per.instance";
+
+
+  /** Hosts on which the service is running. Currently assuming a single port for all instances */
+  public static final String TEZ_TEST_SERVICE_HOSTS = TEZ_TEST_SERVICE_PREFIX + "hosts";
+
+  /** Port on which the Service(s) listen. Current a single port for all instances */
+  public static final String TEZ_TEST_SERVICE_RPC_PORT = TEZ_TEST_SERVICE_PREFIX + "rpc.port";
+
+  /** Number of threads to use in the AM to communicate with the external service */
+  public static final String TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS = TEZ_TEST_SERVICE_PREFIX + "communicator.num.threads";
+  public static final int TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 2;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
new file mode 100644
index 0000000..1108f72
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+
+@ProtocolInfo(protocolName = "org.apache.tez.service.TezTestServiceProtocolBlockingPB", protocolVersion = 1)
+public interface TezTestServiceProtocolBlockingPB extends TezTestServiceProtocolProtos.TezTestServiceProtocol.BlockingInterface {
+}
\ No newline at end of file