You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:17 UTC

[39/43] tez git commit: TEZ-2361. Propagate dag completion to TaskCommunicator. (sseth)

TEZ-2361. Propagate dag completion to TaskCommunicator. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/01374671
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/01374671
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/01374671

Branch: refs/heads/TEZ-2003
Commit: 0137467140aaee7d3dd2f75264d34544972a45ef
Parents: 44f5e8f
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Apr 23 17:26:25 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:55 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../apache/tez/dag/api/TaskCommunicator.java    | 12 +++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  4 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 17 +++++-
 .../dag/app/TaskCommunicatorContextImpl.java    | 64 +++++++++++++++++---
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  5 ++
 .../tez/dag/app/launcher/ContainerLauncher.java |  3 -
 .../dag/app/launcher/ContainerLauncherImpl.java | 12 ----
 .../app/launcher/ContainerLauncherRouter.java   | 10 +++
 .../app/launcher/LocalContainerLauncher.java    |  9 ---
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 11 ----
 .../rm/TezTestServiceTaskSchedulerService.java  |  2 +-
 .../TezTestServiceTaskCommunicatorImpl.java     |  2 +-
 .../tez/tests/TestExternalTezServices.java      |  2 +
 14 files changed, 103 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 7c13110..f6bc8e7 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -17,5 +17,6 @@ ALL CHANGES:
   TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
   TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
   TEZ-2347. Expose additional information in TaskCommunicatorContext.
+  TEZ-2361. Propagate dag completion to TaskCommunicator.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index a2cd858..cadca0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,7 +14,6 @@
 
 package org.apache.tez.dag.api;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 
@@ -74,4 +73,15 @@ public abstract class TaskCommunicator extends AbstractService {
    * @throws Exception
    */
   public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+
+  /**
+   * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
+   * query information about the current dag during the duration of the dagComplete invocation.
+   *
+   * After this, the contents returned from querying the context may change at any point - due to
+   * the next dag being submitted.
+   */
+  // TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially
+  // throw exceptions between a dagComplete and dagStart invocation.
+  public abstract void dagComplete(String dagName);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 1ea369e..568e929 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -759,7 +759,7 @@ public class DAGAppMaster extends AbstractService {
       DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
       LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
           cleanupEvent.getDag().getID());
-      containerLauncher.dagComplete(cleanupEvent.getDag());
+      containerLauncherRouter.dagComplete(cleanupEvent.getDag());
       taskAttemptListener.dagComplete(cleanupEvent.getDag());
       nodes.dagComplete(cleanupEvent.getDag());
       containers.dagComplete(cleanupEvent.getDag());
@@ -773,7 +773,7 @@ public class DAGAppMaster extends AbstractService {
     case NEW_DAG_SUBMITTED:
       // Inform sub-components that a new DAG has been submitted.
       taskSchedulerEventHandler.dagSubmitted();
-      containerLauncher.dagSubmitted();
+      containerLauncherRouter.dagSubmitted();
       taskAttemptListener.dagSubmitted();
       break;
     default:

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index aaf9cca..03b5602 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -81,6 +81,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   private final AppContext context;
   private final TaskCommunicator[] taskCommunicators;
+  private final TaskCommunicatorContext[] taskCommunicatorContexts;
 
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -123,7 +124,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       }
     }
     this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
+    this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length];
     for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
+      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, i);
       taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
     }
     // TODO TEZ-2118 Start using taskCommunicator indices properly
@@ -148,10 +151,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
     if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Using Default Task Communicator");
-      return new TezTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
+      return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
     } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Using Default Local Task Communicator");
-      return new TezLocalTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
+      return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
     } else {
       LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
       Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
@@ -159,7 +162,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       try {
         Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
+        return ctor.newInstance(taskCommunicatorContexts[taskCommIndex]);
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -318,6 +321,14 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // This becomes more relevant when task kills without container kills are allowed.
 
     // TODO TEZ-2336. Send a signal to containers indicating DAG completion.
+
+    // Inform all communicators of the dagCompletion.
+    for (int i = 0 ; i < taskCommunicators.length ; i++) {
+      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
+      taskCommunicators[i].dagComplete(dag.getName());
+      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+    }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 4cb0c93..790066f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -17,6 +17,11 @@ package org.apache.tez.dag.app;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -33,6 +38,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -44,6 +50,10 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   private final AppContext context;
   private final TaskAttemptListenerImpTezDag taskAttemptListener;
   private final int taskCommunicatorIndex;
+  private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
+  private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
+
+  private DAG dag;
 
   public TaskCommunicatorContextImpl(AppContext appContext,
                                      TaskAttemptListenerImpTezDag taskAttemptListener,
@@ -51,6 +61,10 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
     this.context = appContext;
     this.taskAttemptListener = taskAttemptListener;
     this.taskCommunicatorIndex = taskCommunicatorIndex;
+
+    ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock();
+    dagChangedReadLock = dagChangedLock.readLock();
+    dagChangedWriteLock = dagChangedLock.writeLock();
   }
 
   @Override
@@ -111,18 +125,19 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   public void registerForVertexStateUpdates(String vertexName,
                                             @Nullable Set<VertexState> stateSet) {
     Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
-    context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this);
+    getDag().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet,
+        this);
   }
 
   @Override
   public String getCurretnDagName() {
-    return context.getCurrentDAG().getName();
+    return getDag().getName();
   }
 
   @Override
   public Iterable<String> getInputVertexNames(String vertexName) {
     Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
-    Vertex vertex = context.getCurrentDAG().getVertex(vertexName);
+    Vertex vertex = getDag().getVertex(vertexName);
     Set<Vertex> sources = vertex.getInputVertices().keySet();
     return Iterables.transform(sources, new Function<Vertex, String>() {
       @Override
@@ -135,31 +150,32 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   @Override
   public int getVertexTotalTaskCount(String vertexName) {
     Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
-    return context.getCurrentDAG().getVertex(vertexName).getTotalTasks();
+    return getDag().getVertex(vertexName).getTotalTasks();
   }
 
   @Override
   public int getVertexCompletedTaskCount(String vertexName) {
     Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
-    return context.getCurrentDAG().getVertex(vertexName).getCompletedTasks();
+    return getDag().getVertex(vertexName).getCompletedTasks();
   }
 
   @Override
   public int getVertexRunningTaskCount(String vertexName) {
     Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
-    return context.getCurrentDAG().getVertex(vertexName).getRunningTasks();
+    return getDag().getVertex(vertexName).getRunningTasks();
   }
 
   @Override
   public long getFirstAttemptStartTime(String vertexName, int taskIndex) {
     Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
     Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0");
-    return context.getCurrentDAG().getVertex(vertexName).getTask(taskIndex).getFirstAttemptStartTime();
+    return getDag().getVertex(vertexName).getTask(
+        taskIndex).getFirstAttemptStartTime();
   }
 
   @Override
   public long getDagStartTime() {
-    return context.getCurrentDAG().getStartTime();
+    return getDag().getStartTime();
   }
 
   @Override
@@ -171,4 +187,36 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
       throw new TezUncheckedException(e);
     }
   }
+
+  private DAG getDag() {
+    dagChangedReadLock.lock();
+    try {
+      if (dag != null) {
+        return dag;
+      } else {
+        return context.getCurrentDAG();
+      }
+    } finally {
+      dagChangedReadLock.unlock();
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void dagCompleteStart(DAG dag) {
+    dagChangedWriteLock.lock();
+    try {
+      this.dag = dag;
+    } finally {
+      dagChangedWriteLock.unlock();
+    }
+  }
+
+  public void dagCompleteEnd() {
+    dagChangedWriteLock.lock();
+    try {
+      this.dag = null;
+    } finally {
+      dagChangedWriteLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 1417a3b..825a4d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -258,6 +258,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     // Empty. Not registering, or expecting any updates.
   }
 
+  @Override
+  public void dagComplete(String dagName) {
+    // Nothing to do at the moment. Some of the TODOs from TaskAttemptListener apply here.
+  }
+
   protected String getTokenIdentifier() {
     return tokenIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
index 8a8498f..ea07a1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
@@ -26,7 +26,4 @@ import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 public interface ContainerLauncher
     extends EventHandler<NMCommunicatorEvent> {
 
-    void dagComplete(DAG dag);
-
-    void dagSubmitted();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 94889a1..a1eb2a7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -111,17 +110,6 @@ public class ContainerLauncherImpl extends AbstractService implements
     }
   }
 
-  @Override
-  public void dagComplete(DAG dag) {
-    // Nothing required at the moment. Containers are shared across DAGs
-  }
-
-  @Override
-  public void dagSubmitted() {
-    // Nothing to do right now. Indicates that a new DAG has been submitted and
-    // the context has updated information.
-  }
-
   private static enum ContainerState {
     PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index dd3571e..db145f4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -27,6 +27,7 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,6 +129,15 @@ public class ContainerLauncherRouter extends AbstractService
     }
   }
 
+  public void dagComplete(DAG dag) {
+    // Nothing required at the moment. Containers are shared across DAGs
+  }
+
+  public void dagSubmitted() {
+    // Nothing to do right now. Indicates that a new DAG has been submitted and
+    // the context has updated information.
+  }
+
 
   @Override
   public void handle(NMCommunicatorEvent event) {

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 18b2e35..305f8b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,7 +44,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -169,14 +168,6 @@ public class LocalContainerLauncher extends AbstractService implements
     callbackExecutor.shutdownNow();
   }
 
-  @Override
-  public void dagComplete(DAG dag) {
-  }
-
-  @Override
-  public void dagSubmitted() {
-  }
-
   // Thread to monitor the queue of incoming NMCommunicator events
   private class TezSubTaskRunner implements Runnable {
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 051497b..845c440 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,10 +34,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
@@ -54,7 +52,6 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.launcher.ContainerLauncher;
 import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
@@ -150,14 +147,6 @@ public class MockDAGAppMaster extends DAGAppMaster {
       this.goFlag = goFlag;
     }
 
-    @Override
-    public void dagComplete(DAG dag) {
-    }
-
-    @Override
-    public void dagSubmitted() {
-    }
-
     public class ContainerData {
       ContainerId cId;
       TezTaskAttemptID taId;

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 50dfb24..073cb50 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -158,7 +158,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public void resetMatchLocalityForAllHeldContainers() {
+  public void dagComplete() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index ef983c2..cf28b11 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -150,7 +150,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
               t = se.getCause();
             }
             if (t instanceof RemoteException) {
-              RemoteException re = (RemoteException)t;
+              RemoteException re = (RemoteException) t;
               String message = re.toString();
               if (message.contains(RejectedExecutionException.class.getName())) {
                 getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),

http://git-wip-us.apache.org/repos/asf/tez/blob/01374671/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 4d0a610..45c70f1 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
 import org.apache.tez.examples.HashJoinExample;
 import org.apache.tez.examples.JoinDataGen;
 import org.apache.tez.examples.JoinValidateConfigured;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.service.MiniTezTestServiceCluster;
 import org.apache.tez.service.impl.ContainerRunnerImpl;
@@ -124,6 +125,7 @@ public class TestExternalTezServices {
     remoteFs.mkdirs(stagingDirPath);
     // This is currently configured to push tasks into the Service, and then use the standard RPC
     confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
 
     confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
         TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,