You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:50 UTC
[48/50] [abbrv] tez git commit: TEZ-2361. Propagate dag completion to
TaskCommunicator. (sseth)
TEZ-2361. Propagate dag completion to TaskCommunicator. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e5886f8e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e5886f8e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e5886f8e
Branch: refs/heads/TEZ-2003
Commit: e5886f8e329a31e700f794cfc7c669866eaafc27
Parents: 960d92b
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Apr 23 17:26:25 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../apache/tez/dag/api/TaskCommunicator.java | 12 +++-
.../org/apache/tez/dag/app/DAGAppMaster.java | 4 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 17 +++++-
.../dag/app/TaskCommunicatorContextImpl.java | 64 +++++++++++++++++---
.../tez/dag/app/TezTaskCommunicatorImpl.java | 5 ++
.../tez/dag/app/launcher/ContainerLauncher.java | 3 -
.../dag/app/launcher/ContainerLauncherImpl.java | 12 ----
.../app/launcher/ContainerLauncherRouter.java | 10 +++
.../app/launcher/LocalContainerLauncher.java | 9 ---
.../apache/tez/dag/app/MockDAGAppMaster.java | 11 ----
.../rm/TezTestServiceTaskSchedulerService.java | 2 +-
.../TezTestServiceTaskCommunicatorImpl.java | 2 +-
.../tez/tests/TestExternalTezServices.java | 2 +
14 files changed, 103 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 7c13110..f6bc8e7 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -17,5 +17,6 @@ ALL CHANGES:
TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
TEZ-2347. Expose additional information in TaskCommunicatorContext.
+ TEZ-2361. Propagate dag completion to TaskCommunicator.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index a2cd858..cadca0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,7 +14,6 @@
package org.apache.tez.dag.api;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
@@ -74,4 +73,15 @@ public abstract class TaskCommunicator extends AbstractService {
* @throws Exception
*/
public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+
+ /**
+ * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
+ * query information about the current dag during the duration of the dagComplete invocation.
+ *
+ * After this, the contents returned from querying the context may change at any point - due to
+ * the next dag being submitted.
+ */
+ // TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially
+ // throw exceptions between a dagComplete and dagStart invocation.
+ public abstract void dagComplete(String dagName);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 1ea369e..568e929 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -759,7 +759,7 @@ public class DAGAppMaster extends AbstractService {
DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
cleanupEvent.getDag().getID());
- containerLauncher.dagComplete(cleanupEvent.getDag());
+ containerLauncherRouter.dagComplete(cleanupEvent.getDag());
taskAttemptListener.dagComplete(cleanupEvent.getDag());
nodes.dagComplete(cleanupEvent.getDag());
containers.dagComplete(cleanupEvent.getDag());
@@ -773,7 +773,7 @@ public class DAGAppMaster extends AbstractService {
case NEW_DAG_SUBMITTED:
// Inform sub-components that a new DAG has been submitted.
taskSchedulerEventHandler.dagSubmitted();
- containerLauncher.dagSubmitted();
+ containerLauncherRouter.dagSubmitted();
taskAttemptListener.dagSubmitted();
break;
default:
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index a7bbba9..61bd4ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -81,6 +81,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
private final AppContext context;
private final TaskCommunicator[] taskCommunicators;
+ private final TaskCommunicatorContext[] taskCommunicatorContexts;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -123,7 +124,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
}
this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
+ this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length];
for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
+ taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, i);
taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
}
// TODO TEZ-2118 Start using taskCommunicator indices properly
@@ -148,10 +151,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Using Default Task Communicator");
- return new TezTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
+ return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
} else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Using Default Local Task Communicator");
- return new TezLocalTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
+ return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
} else {
LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
@@ -159,7 +162,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
try {
Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
ctor.setAccessible(true);
- return ctor.newInstance(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
+ return ctor.newInstance(taskCommunicatorContexts[taskCommIndex]);
} catch (NoSuchMethodException e) {
throw new TezUncheckedException(e);
} catch (InvocationTargetException e) {
@@ -318,6 +321,14 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// This becomes more relevant when task kills without container kills are allowed.
// TODO TEZ-2336. Send a signal to containers indicating DAG completion.
+
+ // Inform all communicators of the dagCompletion.
+ for (int i = 0 ; i < taskCommunicators.length ; i++) {
+ ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
+ taskCommunicators[i].dagComplete(dag.getName());
+ ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 4cb0c93..790066f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -17,6 +17,11 @@ package org.apache.tez.dag.app;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -33,6 +38,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -44,6 +50,10 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
private final AppContext context;
private final TaskAttemptListenerImpTezDag taskAttemptListener;
private final int taskCommunicatorIndex;
+ private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
+ private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
+
+ private DAG dag;
public TaskCommunicatorContextImpl(AppContext appContext,
TaskAttemptListenerImpTezDag taskAttemptListener,
@@ -51,6 +61,10 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
this.context = appContext;
this.taskAttemptListener = taskAttemptListener;
this.taskCommunicatorIndex = taskCommunicatorIndex;
+
+ ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock();
+ dagChangedReadLock = dagChangedLock.readLock();
+ dagChangedWriteLock = dagChangedLock.writeLock();
}
@Override
@@ -111,18 +125,19 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
public void registerForVertexStateUpdates(String vertexName,
@Nullable Set<VertexState> stateSet) {
Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
- context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this);
+ getDag().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet,
+ this);
}
@Override
public String getCurretnDagName() {
- return context.getCurrentDAG().getName();
+ return getDag().getName();
}
@Override
public Iterable<String> getInputVertexNames(String vertexName) {
Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
- Vertex vertex = context.getCurrentDAG().getVertex(vertexName);
+ Vertex vertex = getDag().getVertex(vertexName);
Set<Vertex> sources = vertex.getInputVertices().keySet();
return Iterables.transform(sources, new Function<Vertex, String>() {
@Override
@@ -135,31 +150,32 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
@Override
public int getVertexTotalTaskCount(String vertexName) {
Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
- return context.getCurrentDAG().getVertex(vertexName).getTotalTasks();
+ return getDag().getVertex(vertexName).getTotalTasks();
}
@Override
public int getVertexCompletedTaskCount(String vertexName) {
Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
- return context.getCurrentDAG().getVertex(vertexName).getCompletedTasks();
+ return getDag().getVertex(vertexName).getCompletedTasks();
}
@Override
public int getVertexRunningTaskCount(String vertexName) {
Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
- return context.getCurrentDAG().getVertex(vertexName).getRunningTasks();
+ return getDag().getVertex(vertexName).getRunningTasks();
}
@Override
public long getFirstAttemptStartTime(String vertexName, int taskIndex) {
Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0");
- return context.getCurrentDAG().getVertex(vertexName).getTask(taskIndex).getFirstAttemptStartTime();
+ return getDag().getVertex(vertexName).getTask(
+ taskIndex).getFirstAttemptStartTime();
}
@Override
public long getDagStartTime() {
- return context.getCurrentDAG().getStartTime();
+ return getDag().getStartTime();
}
@Override
@@ -171,4 +187,36 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
throw new TezUncheckedException(e);
}
}
+
+ private DAG getDag() {
+ dagChangedReadLock.lock();
+ try {
+ if (dag != null) {
+ return dag;
+ } else {
+ return context.getCurrentDAG();
+ }
+ } finally {
+ dagChangedReadLock.unlock();
+ }
+ }
+
+ @InterfaceAudience.Private
+ public void dagCompleteStart(DAG dag) {
+ dagChangedWriteLock.lock();
+ try {
+ this.dag = dag;
+ } finally {
+ dagChangedWriteLock.unlock();
+ }
+ }
+
+ public void dagCompleteEnd() {
+ dagChangedWriteLock.lock();
+ try {
+ this.dag = null;
+ } finally {
+ dagChangedWriteLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 1417a3b..825a4d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -258,6 +258,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
// Empty. Not registering, or expecting any updates.
}
+ @Override
+ public void dagComplete(String dagName) {
+ // Nothing to do at the moment. Some of the TODOs from TaskAttemptListener apply here.
+ }
+
protected String getTokenIdentifier() {
return tokenIdentifier;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
index 8a8498f..ea07a1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
@@ -26,7 +26,4 @@ import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
public interface ContainerLauncher
extends EventHandler<NMCommunicatorEvent> {
- void dagComplete(DAG dag);
-
- void dagSubmitted();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 94889a1..a1eb2a7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -111,17 +110,6 @@ public class ContainerLauncherImpl extends AbstractService implements
}
}
- @Override
- public void dagComplete(DAG dag) {
- // Nothing required at the moment. Containers are shared across DAGs
- }
-
- @Override
- public void dagSubmitted() {
- // Nothing to do right now. Indicates that a new DAG has been submitted and
- // the context has updated information.
- }
-
private static enum ContainerState {
PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index dd3571e..db145f4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -27,6 +27,7 @@ import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,6 +129,15 @@ public class ContainerLauncherRouter extends AbstractService
}
}
+ public void dagComplete(DAG dag) {
+ // Nothing required at the moment. Containers are shared across DAGs
+ }
+
+ public void dagSubmitted() {
+ // Nothing to do right now. Indicates that a new DAG has been submitted and
+ // the context has updated information.
+ }
+
@Override
public void handle(NMCommunicatorEvent event) {
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 18b2e35..305f8b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,7 +44,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -169,14 +168,6 @@ public class LocalContainerLauncher extends AbstractService implements
callbackExecutor.shutdownNow();
}
- @Override
- public void dagComplete(DAG dag) {
- }
-
- @Override
- public void dagSubmitted() {
- }
-
// Thread to monitor the queue of incoming NMCommunicator events
private class TezSubTaskRunner implements Runnable {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index d40b78d..2a38096 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,10 +34,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.AbstractService;
@@ -54,7 +52,6 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.launcher.ContainerLauncher;
import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
@@ -146,14 +143,6 @@ public class MockDAGAppMaster extends DAGAppMaster {
this.goFlag = goFlag;
}
- @Override
- public void dagComplete(DAG dag) {
- }
-
- @Override
- public void dagSubmitted() {
- }
-
public class ContainerData {
ContainerId cId;
TezTaskAttemptID taId;
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 50dfb24..073cb50 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -158,7 +158,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public void resetMatchLocalityForAllHeldContainers() {
+ public void dagComplete() {
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index ef983c2..cf28b11 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -150,7 +150,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
t = se.getCause();
}
if (t instanceof RemoteException) {
- RemoteException re = (RemoteException)t;
+ RemoteException re = (RemoteException) t;
String message = re.toString();
if (message.contains(RejectedExecutionException.class.getName())) {
getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
http://git-wip-us.apache.org/repos/asf/tez/blob/e5886f8e/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 4d0a610..45c70f1 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidateConfigured;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.service.MiniTezTestServiceCluster;
import org.apache.tez.service.impl.ContainerRunnerImpl;
@@ -124,6 +125,7 @@ public class TestExternalTezServices {
remoteFs.mkdirs(stagingDirPath);
// This is currently configured to push tasks into the Service, and then use the standard RPC
confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+ confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,