You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/07/23 01:17:29 UTC
git commit: TEZ-310. Speedup startup by starting services in parallel
(bikas)
Updated Branches:
refs/heads/master f5ed2e258 -> c19c62081
TEZ-310. Speedup startup by starting services in parallel (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c19c6208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c19c6208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c19c6208
Branch: refs/heads/master
Commit: c19c62081d054e169697a6c18a8266c235127415
Parents: f5ed2e2
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Jul 22 16:05:22 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Jul 22 16:05:22 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/dag/app/AppContext.java | 5 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 262 +++++++++++++++----
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 18 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 -
.../dag/app/rm/TaskSchedulerEventHandler.java | 5 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 8 +-
6 files changed, 220 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 50580db..8f18993 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -41,6 +42,8 @@ public interface AppContext {
DAGAppMaster getAppMaster();
+ TezConfiguration getConf();
+
ApplicationId getApplicationID();
TezDAGID getDAGID();
@@ -70,5 +73,5 @@ public interface AppContext {
AMNodeMap getAllNodes();
- TaskSchedulerEventHandler getTaskScheduler();
+ TaskSchedulerEventHandler getTaskScheduler();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index b04701a..28790fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -22,11 +22,14 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,15 +37,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -122,12 +127,12 @@ import org.apache.tez.engine.common.security.JobTokenSecretManager;
*/
@SuppressWarnings("rawtypes")
-public class DAGAppMaster extends CompositeService {
+public class DAGAppMaster extends AbstractService {
private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
/**
- * Priority of the MRAppMaster shutdown hook.
+ * Priority of the DAGAppMaster shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
@@ -145,7 +150,7 @@ public class DAGAppMaster extends CompositeService {
private AMContainerMap containers;
private AMNodeMap nodes;
// TODO Metrics
- //protected final MRAppMetrics metrics;
+ //protected final DAGAppMetrics metrics;
// TODO Recovery
//private Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun;
private AppContext context;
@@ -179,6 +184,11 @@ public class DAGAppMaster extends CompositeService {
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
+ // must be LinkedHashMap to preserve order of service addition
+ Map<Service, ServiceWithDependency> services =
+ new LinkedHashMap<Service, ServiceWithDependency>();
+
+
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, DAGPlan dagPB) {
@@ -201,8 +211,8 @@ public class DAGAppMaster extends CompositeService {
this.nmHttpPort = nmHttpPort;
this.state = DAGAppMasterState.NEW;
// TODO Metrics
- //this.metrics = MRAppMetrics.create();
- LOG.info("Created MRAppMaster for application " + applicationAttemptId);
+ //this.metrics = DAGAppMetrics.create();
+ LOG.info("Created DAGAppMaster for application " + applicationAttemptId);
}
@Override
@@ -227,38 +237,35 @@ public class DAGAppMaster extends CompositeService {
clientHandler = new DAGClientHandler();
- // TODO Committer.
- // committer = createOutputCommitter(conf);
-
dispatcher = createDispatcher();
- addIfService(dispatcher);
+ addIfService(dispatcher, false);
clientRpcServer = new DAGClientServer(clientHandler);
- addIfService(clientRpcServer);
+ addIfService(clientRpcServer, true);
taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
- addIfService(taskHeartbeatHandler);
+ addIfService(taskHeartbeatHandler, true);
containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
- addIfService(containerHeartbeatHandler);
+ addIfService(containerHeartbeatHandler, true);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
taskHeartbeatHandler, containerHeartbeatHandler);
- addIfService(taskAttemptListener);
+ addIfService(taskAttemptListener, true);
containers = new AMContainerMap(containerHeartbeatHandler,
taskAttemptListener, context);
- addIfService(containers);
+ addIfService(containers, true);
dispatcher.register(AMContainerEventType.class, containers);
nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
- addIfService(nodes);
+ addIfService(nodes, true);
dispatcher.register(AMNodeEventType.class, nodes);
//service to do the task cleanup
taskCleaner = createTaskCleaner(context);
- addIfService(taskCleaner);
+ addIfService(taskCleaner, true);
this.dagEventDispatcher = new DagEventDispatcher();
this.vertexEventDispatcher = new VertexEventDispatcher();
@@ -273,22 +280,24 @@ public class DAGAppMaster extends CompositeService {
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
- clientRpcServer);
- addIfService(taskSchedulerEventHandler);
+ clientRpcServer, dispatcher.getEventHandler());
+ addIfService(taskSchedulerEventHandler, true);
dispatcher.register(AMSchedulerEventType.class,
taskSchedulerEventHandler);
+ addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
// TODO XXX: Rename to NMComm
// corresponding service to launch allocated containers via NodeManager
// containerLauncher = createNMCommunicator(context);
containerLauncher = createContainerLauncher(context);
- addIfService(containerLauncher);
+ addIfService(containerLauncher, true);
dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
historyEventHandler = new HistoryEventHandler(context);
- addIfService(historyEventHandler);
+ addIfService(historyEventHandler, true);
dispatcher.register(HistoryEventType.class, historyEventHandler);
+ initServices(conf);
super.serviceInit(conf);
} // end of init()
@@ -297,16 +306,6 @@ public class DAGAppMaster extends CompositeService {
}
/**
- * Create the default file System for this job.
- * @param conf the conf object
- * @return the default filesystem for this job
- * @throws IOException
- */
- protected FileSystem getFileSystem(Configuration conf) throws IOException {
- return FileSystem.get(conf);
- }
-
- /**
* Exit call. Just in a function call to enable testing.
*/
protected void sysexit() {
@@ -525,15 +524,9 @@ public class DAGAppMaster extends CompositeService {
// create single job
DAG newDag =
- new DAGImpl(dagId, appAttemptID, conf, dagPB, dispatcher.getEventHandler(),
+ new DAGImpl(dagId, conf, dagPB, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
- // TODO Recovery
- //completedTasksFromPreviousRun,
- // TODO Metrics
- //metrics,
- //committer, newApiCommitter,
- currentUser.getShortUserName(), appSubmitTime,
- //amInfos,
+ currentUser.getShortUserName(),
taskHeartbeatHandler, context);
((RunningAppContext) context).setDAG(newDag);
@@ -578,9 +571,24 @@ public class DAGAppMaster extends CompositeService {
}
}
- protected void addIfService(Object object) {
+ protected void addIfService(Object object, boolean addDispatcher) {
if (object instanceof Service) {
- addService((Service) object);
+ Service service = (Service) object;
+ ServiceWithDependency sd = new ServiceWithDependency(service);
+ services.put(service, sd);
+ if(addDispatcher) {
+ addIfServiceDependency(service, dispatcher);
+ }
+ }
+ }
+
+ protected void addIfServiceDependency(Object object, Object dependency) {
+ if (object instanceof Service && dependency instanceof Service) {
+ Service service = (Service) object;
+ Service dependencyService = (Service) dependency;
+ ServiceWithDependency sd = services.get(service);
+ sd.dependencies.add(dependencyService);
+ dependencyService.registerServiceListener(sd);
}
}
@@ -760,6 +768,11 @@ public class DAGAppMaster extends CompositeService {
public DAGAppMaster getAppMaster() {
return DAGAppMaster.this;
}
+
+ @Override
+ public TezConfiguration getConf() {
+ return conf;
+ }
@Override
public ApplicationAttemptId getApplicationAttemptId() {
@@ -834,7 +847,7 @@ public class DAGAppMaster extends CompositeService {
}
return taskSchedulerEventHandler.getApplicationAcls();
}
-
+
@Override
public TezDAGID getDAGID() {
try {
@@ -857,21 +870,155 @@ public class DAGAppMaster extends CompositeService {
}
+ private class ServiceWithDependency implements ServiceStateChangeListener {
+ ServiceWithDependency(Service service) {
+ this.service = service;
+ }
+ Service service;
+ List<Service> dependencies = new ArrayList<Service>();
+ AtomicInteger dependenciesStarted = new AtomicInteger(0);
+ boolean canStart = false;
+
+ @Override
+ public void stateChanged(Service dependency) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Service dependency: " + dependency.getName() + " notify" +
+ " for service: " + service.getName());
+ }
+ if(dependency.isInState(Service.STATE.STARTED)) {
+ if(dependenciesStarted.incrementAndGet() == dependencies.size()) {
+ synchronized(this) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Service: " + service.getName() + " notified to start");
+ }
+ canStart = true;
+ this.notifyAll();
+ }
+ }
+ }
+ }
+
+ void start() throws InterruptedException {
+ if(dependencies.size() > 0) {
+ synchronized(this) {
+ while(!canStart) {
+ this.wait(1000*60*3L);
+ }
+ }
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Service: " + service.getName() + " trying to start");
+ }
+ for(Service dependency : dependencies) {
+ if(!dependency.isInState(Service.STATE.STARTED)){
+ LOG.info("Service: " + service.getName() + " not started because "
+ + " service: " + dependency.getName() +
+ " is in state: " + dependency.getServiceState());
+ return;
+ }
+ }
+ service.start();
+ }
+ }
+
+ private class ServiceThread extends Thread {
+ final ServiceWithDependency serviceWithDependency;
+ Throwable error = null;
+ public ServiceThread(ServiceWithDependency serviceWithDependency) {
+ this.serviceWithDependency = serviceWithDependency;
+ this.setName("ServiceThread:" + serviceWithDependency.service.getName());
+ }
+
+ public void run() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Starting thread " + serviceWithDependency.service.getName());
+ }
+ long start = System.currentTimeMillis();
+ try {
+ serviceWithDependency.start();
+ } catch (Throwable t) {
+ error = t;
+ } finally {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Service: " + serviceWithDependency.service.getName() +
+ " started in " + (System.currentTimeMillis() - start) + "ms");
+ }
+ }
+ }
+ }
+
+ void startServices(){
+ try {
+ Throwable firstError = null;
+ List<ServiceThread> threads = new ArrayList<ServiceThread>();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Begin parallel start");
+ }
+ for(ServiceWithDependency sd : services.values()) {
+ // start the service. If this fails that service
+ // will be stopped and an exception raised
+ ServiceThread st = new ServiceThread(sd);
+ threads.add(st);
+ }
+ for(ServiceThread st : threads) {
+ st.start();
+ }
+ for(ServiceThread st : threads) {
+ st.join();
+ if(st.error != null && firstError == null) {
+ firstError = st.error;
+ }
+ }
+
+ if(firstError != null) {
+ throw ServiceStateException.convert(firstError);
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("End parallel start");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ void initServices(TezConfiguration conf) {
+ for (ServiceWithDependency sd : services.values()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initing service : " + sd.service);
+ }
+ sd.service.init(conf);
+ }
+ }
+
+ void stopServices() {
+ // stop in reverse order of start
+ List<Service> serviceList = new ArrayList<Service>(services.size());
+ for (ServiceWithDependency sd : services.values()) {
+ serviceList.add(sd.service);
+ }
+ Exception firstException = null;
+ for (int i = services.size() - 1; i >= 0; i--) {
+ Service service = serviceList.get(i);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping service : " + service);
+ }
+ Exception ex = ServiceOperations.stopQuietly(LOG, service);
+ if (ex != null && firstException == null) {
+ firstException = ex;
+ }
+ }
+ //after stopping all services, rethrow the first exception raised
+ if (firstException != null) {
+ throw ServiceStateException.convert(firstException);
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
public void serviceStart() throws Exception {
this.state = DAGAppMasterState.RUNNING;
- // TODO Recovery
- // Pull completedTasks etc from recovery
- /*
- if (inRecovery) {
- completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
- amInfos = recoveryServ.getAMInfos();
- }
- */
-
// /////////////////// Create the job itself.
dag = createDAG(dagPlan);
@@ -879,7 +1026,7 @@ public class DAGAppMaster extends CompositeService {
// metrics system init is really init & start.
// It's more test friendly to put it here.
- DefaultMetricsSystem.initialize("MRAppMaster");
+ DefaultMetricsSystem.initialize("DAGAppMaster");
// create a job event for job intialization
DAGEvent initDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_INIT);
@@ -889,6 +1036,7 @@ public class DAGAppMaster extends CompositeService {
dagEventDispatcher.handle(initDagEvent);
//start all the components
+ startServices();
super.serviceStart();
this.dagsStartTime = clock.getTime();
@@ -900,6 +1048,12 @@ public class DAGAppMaster extends CompositeService {
// All components have started, start the job.
startDags();
}
+
+ @Override
+ public void serviceStop() throws Exception {
+ stopServices();
+ super.serviceStop();
+ }
/**
* This can be overridden to instantiate multiple jobs and create a
@@ -1045,7 +1199,7 @@ public class DAGAppMaster extends CompositeService {
jobUserName);
} catch (Throwable t) {
- LOG.fatal("Error starting MRAppMaster", t);
+ LOG.fatal("Error starting DAGAppMaster", t);
System.exit(1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index be25c66..27c0180 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -80,8 +79,6 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
@@ -111,7 +108,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
.getProperty("line.separator");
//final fields
- private final ApplicationAttemptId applicationAttemptId;
private final TezDAGID dagId;
private final Clock clock;
private final ApplicationACLsManager aclsManager;
@@ -133,7 +129,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
//private final MRAppMetrics metrics;
private final String userName;
private final String queueName;
- private final long appSubmitTime;
private final AppContext appContext;
volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
@@ -314,36 +309,25 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private long startTime;
private long finishTime;
- public DAGImpl(TezDAGID dagId, ApplicationAttemptId applicationAttemptId,
+ public DAGImpl(TezDAGID dagId,
TezConfiguration conf,
DAGPlan jobPlan,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
- // TODO Metrics
- //MRAppMetrics metrics,
String appUserName,
- long appSubmitTime,
- // TODO Recovery
- //List<AMInfo> amInfos,
TaskHeartbeatHandler thh,
AppContext appContext) {
- this.applicationAttemptId = applicationAttemptId;
this.dagId = dagId;
this.jobPlan = jobPlan;
this.conf = conf;
this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
this.userName = appUserName;
- // TODO Metrics
- //this.metrics = metrics;
this.clock = clock;
- // TODO Recovery
- //this.amInfos = amInfos;
this.appContext = appContext;
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
- this.appSubmitTime = appSubmitTime;
this.taskAttemptListener = taskAttemptListener;
this.taskHeartbeatHandler = thh;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 637e150..3ad1e26 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -37,7 +37,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.MRVertexOutputCommitter;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -741,16 +740,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt));
}
- /**
- * Create the default file System for this job.
- * @param conf the conf object
- * @return the default filesystem for this job
- * @throws IOException
- */
- protected FileSystem getFileSystem(Configuration conf) throws IOException {
- return FileSystem.get(conf);
- }
-
static VertexState checkVertexForCompletion(VertexImpl vertex) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 14baecd..8ee2ff9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -81,11 +81,12 @@ public class TaskSchedulerEventHandler extends AbstractService
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
+ @SuppressWarnings("rawtypes")
public TaskSchedulerEventHandler(AppContext appContext,
- DAGClientServer clientService) {
+ DAGClientServer clientService, EventHandler eventHandler) {
super(TaskSchedulerEventHandler.class.getName());
this.appContext = appContext;
- this.eventHandler = appContext.getEventHandler();
+ this.eventHandler = eventHandler;
this.clientService = clientService;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index a2585c6..142ceb9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -486,16 +486,16 @@ public class TestDAGImpl {
appContext = mock(AppContext.class);
doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
doReturn(dagId).when(appContext).getDAGID();
- dag = new DAGImpl(dagId, appAttemptId, conf, dagPlan,
+ dag = new DAGImpl(dagId, conf, dagPlan,
dispatcher.getEventHandler(), taskAttemptListener,
- jobTokenSecretManager, fsTokens, clock, "user", 10000, thh, appContext);
+ jobTokenSecretManager, fsTokens, clock, "user", thh, appContext);
doReturn(dag).when(appContext).getDAG();
mrrAppContext = mock(AppContext.class);
mrrDagId = new TezDAGID(appAttemptId.getApplicationId(), 2);
mrrDagPlan = createTestMRRDAGPlan();
- mrrDag = new DAGImpl(mrrDagId, appAttemptId, conf, mrrDagPlan,
+ mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
dispatcher.getEventHandler(), taskAttemptListener,
- jobTokenSecretManager, fsTokens, clock, "user", 10000, thh,
+ jobTokenSecretManager, fsTokens, clock, "user", thh,
mrrAppContext);
doReturn(mrrDag).when(mrrAppContext).getDAG();
doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();