You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/07/23 01:17:29 UTC

git commit: TEZ-310. Speedup startup by starting services in parallel (bikas)

Updated Branches:
  refs/heads/master f5ed2e258 -> c19c62081


TEZ-310. Speedup startup by starting services in parallel (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c19c6208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c19c6208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c19c6208

Branch: refs/heads/master
Commit: c19c62081d054e169697a6c18a8266c235127415
Parents: f5ed2e2
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Jul 22 16:05:22 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Jul 22 16:05:22 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/dag/app/AppContext.java |   5 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 262 +++++++++++++++----
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  18 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  11 -
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   5 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   8 +-
 6 files changed, 220 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 50580db..8f18993 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -41,6 +42,8 @@ public interface AppContext {
   
   DAGAppMaster getAppMaster();
   
+  TezConfiguration getConf();
+  
   ApplicationId getApplicationID();
 
   TezDAGID getDAGID();
@@ -70,5 +73,5 @@ public interface AppContext {
   
   AMNodeMap getAllNodes();
   
-  TaskSchedulerEventHandler getTaskScheduler();
+  TaskSchedulerEventHandler getTaskScheduler();  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index b04701a..28790fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -22,11 +22,14 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -34,15 +37,17 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -122,12 +127,12 @@ import org.apache.tez.engine.common.security.JobTokenSecretManager;
  */
 
 @SuppressWarnings("rawtypes")
-public class DAGAppMaster extends CompositeService {
+public class DAGAppMaster extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
 
   /**
-   * Priority of the MRAppMaster shutdown hook.
+   * Priority of the DAGAppMaster shutdown hook.
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
@@ -145,7 +150,7 @@ public class DAGAppMaster extends CompositeService {
   private AMContainerMap containers;
   private AMNodeMap nodes;
   // TODO Metrics
-  //protected final MRAppMetrics metrics;
+  //protected final DAGAppMetrics metrics;
   // TODO Recovery
   //private Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun;
   private AppContext context;
@@ -179,6 +184,11 @@ public class DAGAppMaster extends CompositeService {
   private Credentials fsTokens = new Credentials(); // Filled during init
   private UserGroupInformation currentUser; // Will be setup during init
 
+  // must be LinkedHashMap to preserve order of service addition
+  Map<Service, ServiceWithDependency> services = 
+      new LinkedHashMap<Service, ServiceWithDependency>();
+
+
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       long appSubmitTime, DAGPlan dagPB) {
@@ -201,8 +211,8 @@ public class DAGAppMaster extends CompositeService {
     this.nmHttpPort = nmHttpPort;
     this.state = DAGAppMasterState.NEW;
     // TODO Metrics
-    //this.metrics = MRAppMetrics.create();
-    LOG.info("Created MRAppMaster for application " + applicationAttemptId);
+    //this.metrics = DAGAppMetrics.create();
+    LOG.info("Created DAGAppMaster for application " + applicationAttemptId);
   }
 
   @Override
@@ -227,38 +237,35 @@ public class DAGAppMaster extends CompositeService {
 
     clientHandler = new DAGClientHandler();
 
-    // TODO Committer.
-    //    committer = createOutputCommitter(conf);
-
     dispatcher = createDispatcher();
-    addIfService(dispatcher);
+    addIfService(dispatcher, false);
 
     clientRpcServer = new DAGClientServer(clientHandler);
-    addIfService(clientRpcServer);
+    addIfService(clientRpcServer, true);
 
     taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
-    addIfService(taskHeartbeatHandler);
+    addIfService(taskHeartbeatHandler, true);
 
     containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
-    addIfService(containerHeartbeatHandler);
+    addIfService(containerHeartbeatHandler, true);
 
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context,
         taskHeartbeatHandler, containerHeartbeatHandler);
-    addIfService(taskAttemptListener);
+    addIfService(taskAttemptListener, true);
 
     containers = new AMContainerMap(containerHeartbeatHandler,
         taskAttemptListener, context);
-    addIfService(containers);
+    addIfService(containers, true);
     dispatcher.register(AMContainerEventType.class, containers);
 
     nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
-    addIfService(nodes);
+    addIfService(nodes, true);
     dispatcher.register(AMNodeEventType.class, nodes);
 
     //service to do the task cleanup
     taskCleaner = createTaskCleaner(context);
-    addIfService(taskCleaner);
+    addIfService(taskCleaner, true);
 
     this.dagEventDispatcher = new DagEventDispatcher();
     this.vertexEventDispatcher = new VertexEventDispatcher();
@@ -273,22 +280,24 @@ public class DAGAppMaster extends CompositeService {
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
 
     taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
-        clientRpcServer);
-    addIfService(taskSchedulerEventHandler);
+        clientRpcServer, dispatcher.getEventHandler());
+    addIfService(taskSchedulerEventHandler, true);
     dispatcher.register(AMSchedulerEventType.class,
         taskSchedulerEventHandler);
+    addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
 
     //    TODO XXX: Rename to NMComm
     //    corresponding service to launch allocated containers via NodeManager
     //    containerLauncher = createNMCommunicator(context);
     containerLauncher = createContainerLauncher(context);
-    addIfService(containerLauncher);
+    addIfService(containerLauncher, true);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
 
     historyEventHandler = new HistoryEventHandler(context);
-    addIfService(historyEventHandler);
+    addIfService(historyEventHandler, true);
     dispatcher.register(HistoryEventType.class, historyEventHandler);
 
+    initServices(conf);
     super.serviceInit(conf);
   } // end of init()
 
@@ -297,16 +306,6 @@ public class DAGAppMaster extends CompositeService {
   }
 
   /**
-   * Create the default file System for this job.
-   * @param conf the conf object
-   * @return the default filesystem for this job
-   * @throws IOException
-   */
-  protected FileSystem getFileSystem(Configuration conf) throws IOException {
-    return FileSystem.get(conf);
-  }
-
-  /**
    * Exit call. Just in a function call to enable testing.
    */
   protected void sysexit() {
@@ -525,15 +524,9 @@ public class DAGAppMaster extends CompositeService {
 
     // create single job
     DAG newDag =
-        new DAGImpl(dagId, appAttemptID, conf, dagPB, dispatcher.getEventHandler(),
+        new DAGImpl(dagId, conf, dagPB, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
-            // TODO Recovery
-            //completedTasksFromPreviousRun,
-            // TODO Metrics
-            //metrics,
-            //committer, newApiCommitter,
-            currentUser.getShortUserName(), appSubmitTime,
-            //amInfos,
+            currentUser.getShortUserName(),
             taskHeartbeatHandler, context);
     ((RunningAppContext) context).setDAG(newDag);
 
@@ -578,9 +571,24 @@ public class DAGAppMaster extends CompositeService {
     }
   }
 
-  protected void addIfService(Object object) {
+  protected void addIfService(Object object, boolean addDispatcher) {
     if (object instanceof Service) {
-      addService((Service) object);
+      Service service = (Service) object; 
+      ServiceWithDependency sd = new ServiceWithDependency(service); 
+      services.put(service, sd);
+      if(addDispatcher) {
+        addIfServiceDependency(service, dispatcher);
+      }
+    }
+  }
+  
+  protected void addIfServiceDependency(Object object, Object dependency) {
+    if (object instanceof Service && dependency instanceof Service) {
+      Service service = (Service) object;
+      Service dependencyService = (Service) dependency;
+      ServiceWithDependency sd = services.get(service); 
+      sd.dependencies.add(dependencyService);
+      dependencyService.registerServiceListener(sd);
     }
   }
 
@@ -760,6 +768,11 @@ public class DAGAppMaster extends CompositeService {
     public DAGAppMaster getAppMaster() {
       return DAGAppMaster.this;
     }
+    
+    @Override
+    public TezConfiguration getConf() {
+      return conf;
+    }
 
     @Override
     public ApplicationAttemptId getApplicationAttemptId() {
@@ -834,7 +847,7 @@ public class DAGAppMaster extends CompositeService {
       }
       return taskSchedulerEventHandler.getApplicationAcls();
     }
-
+    
     @Override
     public TezDAGID getDAGID() {
       try {
@@ -857,21 +870,155 @@ public class DAGAppMaster extends CompositeService {
     
   }
 
+  private class ServiceWithDependency implements ServiceStateChangeListener {
+    ServiceWithDependency(Service service) {
+      this.service = service;
+    }
+    Service service;
+    List<Service> dependencies = new ArrayList<Service>();
+    AtomicInteger dependenciesStarted = new AtomicInteger(0);
+    boolean canStart = false;
+
+    @Override
+    public void stateChanged(Service dependency) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Service dependency: " + dependency.getName() + " notify" + 
+                  " for service: " + service.getName());
+      }
+      if(dependency.isInState(Service.STATE.STARTED)) {
+        if(dependenciesStarted.incrementAndGet() == dependencies.size()) {
+          synchronized(this) {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Service: " + service.getName() + " notified to start");
+            }
+            canStart = true;
+            this.notifyAll();
+          }
+        }
+      }
+    }
+    
+    void start() throws InterruptedException {
+      if(dependencies.size() > 0) {
+        synchronized(this) {
+          while(!canStart) {
+            this.wait(1000*60*3L);
+          }
+        }
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Service: " + service.getName() + " trying to start");
+      }
+      for(Service dependency : dependencies) {
+        if(!dependency.isInState(Service.STATE.STARTED)){
+          LOG.info("Service: " + service.getName() + " not started because " 
+                   + " service: " + dependency.getName() + 
+                   " is in state: " + dependency.getServiceState());
+          return;
+        }
+      }
+      service.start();
+    }
+  }
+  
+  private class ServiceThread extends Thread {
+    final ServiceWithDependency serviceWithDependency;
+    Throwable error = null;
+    public ServiceThread(ServiceWithDependency serviceWithDependency) {
+      this.serviceWithDependency = serviceWithDependency;
+      this.setName("ServiceThread:" + serviceWithDependency.service.getName());
+    }
+    
+    public void run() {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread " + serviceWithDependency.service.getName()); 
+      }
+      long start = System.currentTimeMillis();
+      try {
+        serviceWithDependency.start();
+      } catch (Throwable t) {
+        error = t;
+      } finally {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Service: " + serviceWithDependency.service.getName() + 
+              " started in " + (System.currentTimeMillis() - start) + "ms");
+        }
+      }
+    }
+  }
+
+  void startServices(){
+    try {
+      Throwable firstError = null;
+      List<ServiceThread> threads = new ArrayList<ServiceThread>();
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Begin parallel start");
+      }
+      for(ServiceWithDependency sd : services.values()) {
+        // start the service. If this fails that service
+        // will be stopped and an exception raised
+        ServiceThread st = new ServiceThread(sd);
+        threads.add(st);
+      }
+      for(ServiceThread st : threads) {
+        st.start();
+      }
+      for(ServiceThread st : threads) {
+        st.join();
+        if(st.error != null && firstError == null) {
+            firstError = st.error;
+        }
+      }
+      
+      if(firstError != null) {
+        throw ServiceStateException.convert(firstError);
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("End parallel start");
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  void initServices(TezConfiguration conf) {
+    for (ServiceWithDependency sd : services.values()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initing service : " + sd.service);
+      }
+      sd.service.init(conf);
+    }
+  }
+  
+  void stopServices() {
+    // stop in reverse order of start
+    List<Service> serviceList = new ArrayList<Service>(services.size());
+    for (ServiceWithDependency sd : services.values()) {
+      serviceList.add(sd.service);
+    }
+    Exception firstException = null;
+    for (int i = services.size() - 1; i >= 0; i--) {
+      Service service = serviceList.get(i);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Stopping service : " + service);
+      }
+      Exception ex = ServiceOperations.stopQuietly(LOG, service);
+      if (ex != null && firstException == null) {
+        firstException = ex;
+      }
+    }
+    //after stopping all services, rethrow the first exception raised
+    if (firstException != null) {
+      throw ServiceStateException.convert(firstException);
+    }
+  }
+  
   @SuppressWarnings("unchecked")
   @Override
   public void serviceStart() throws Exception {
 
     this.state = DAGAppMasterState.RUNNING;
 
-    // TODO Recovery
-    // Pull completedTasks etc from recovery
-    /*
-    if (inRecovery) {
-      completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
-      amInfos = recoveryServ.getAMInfos();
-    }
-    */
-
     // /////////////////// Create the job itself.
     dag = createDAG(dagPlan);
 
@@ -879,7 +1026,7 @@ public class DAGAppMaster extends CompositeService {
 
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
-    DefaultMetricsSystem.initialize("MRAppMaster");
+    DefaultMetricsSystem.initialize("DAGAppMaster");
 
     // create a job event for job intialization
     DAGEvent initDagEvent = new DAGEvent(dag.getID(), DAGEventType.DAG_INIT);
@@ -889,6 +1036,7 @@ public class DAGAppMaster extends CompositeService {
     dagEventDispatcher.handle(initDagEvent);
 
     //start all the components
+    startServices();
     super.serviceStart();
 
     this.dagsStartTime = clock.getTime();
@@ -900,6 +1048,12 @@ public class DAGAppMaster extends CompositeService {
     // All components have started, start the job.
     startDags();
   }
+  
+  @Override
+  public void serviceStop() throws Exception {
+    stopServices();
+    super.serviceStop();
+  }
 
   /**
    * This can be overridden to instantiate multiple jobs and create a
@@ -1045,7 +1199,7 @@ public class DAGAppMaster extends CompositeService {
           jobUserName);
 
     } catch (Throwable t) {
-      LOG.fatal("Error starting MRAppMaster", t);
+      LOG.fatal("Error starting DAGAppMaster", t);
       System.exit(1);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index be25c66..27c0180 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -80,8 +79,6 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
@@ -111,7 +108,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       .getProperty("line.separator");
 
   //final fields
-  private final ApplicationAttemptId applicationAttemptId;
   private final TezDAGID dagId;
   private final Clock clock;
   private final ApplicationACLsManager aclsManager;
@@ -133,7 +129,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   //private final MRAppMetrics metrics;
   private final String userName;
   private final String queueName;
-  private final long appSubmitTime;
   private final AppContext appContext;
 
   volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
@@ -314,36 +309,25 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private long startTime;
   private long finishTime;
 
-  public DAGImpl(TezDAGID dagId, ApplicationAttemptId applicationAttemptId,
+  public DAGImpl(TezDAGID dagId,
       TezConfiguration conf,
       DAGPlan jobPlan,
       EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
       Credentials fsTokenCredentials, Clock clock,
-      // TODO Metrics
-      //MRAppMetrics metrics,
       String appUserName,
-      long appSubmitTime,
-      // TODO Recovery
-      //List<AMInfo> amInfos,
       TaskHeartbeatHandler thh,
       AppContext appContext) {
-    this.applicationAttemptId = applicationAttemptId;
     this.dagId = dagId;
     this.jobPlan = jobPlan;
     this.conf = conf;
     this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
 
     this.userName = appUserName;
-    // TODO Metrics
-    //this.metrics = metrics;
     this.clock = clock;
-    // TODO Recovery
-    //this.amInfos = amInfos;
     this.appContext = appContext;
     this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
-    this.appSubmitTime = appSubmitTime;
 
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 637e150..3ad1e26 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -37,7 +37,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.MRVertexOutputCommitter;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -741,16 +740,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
-  /**
-   * Create the default file System for this job.
-   * @param conf the conf object
-   * @return the default filesystem for this job
-   * @throws IOException
-   */
-  protected FileSystem getFileSystem(Configuration conf) throws IOException {
-    return FileSystem.get(conf);
-  }
-
   static VertexState checkVertexForCompletion(VertexImpl vertex) {
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 14baecd..8ee2ff9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -81,11 +81,12 @@ public class TaskSchedulerEventHandler extends AbstractService
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
 
+  @SuppressWarnings("rawtypes")
   public TaskSchedulerEventHandler(AppContext appContext,
-      DAGClientServer clientService) {
+      DAGClientServer clientService, EventHandler eventHandler) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
-    this.eventHandler = appContext.getEventHandler();
+    this.eventHandler = eventHandler;
     this.clientService = clientService;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c19c6208/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index a2585c6..142ceb9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -486,16 +486,16 @@ public class TestDAGImpl {
     appContext = mock(AppContext.class);
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     doReturn(dagId).when(appContext).getDAGID();
-    dag = new DAGImpl(dagId, appAttemptId, conf, dagPlan,
+    dag = new DAGImpl(dagId, conf, dagPlan,
         dispatcher.getEventHandler(),  taskAttemptListener,
-        jobTokenSecretManager, fsTokens, clock, "user", 10000, thh, appContext);
+        jobTokenSecretManager, fsTokens, clock, "user", thh, appContext);
     doReturn(dag).when(appContext).getDAG();
     mrrAppContext = mock(AppContext.class);
     mrrDagId = new TezDAGID(appAttemptId.getApplicationId(), 2);
     mrrDagPlan = createTestMRRDAGPlan();
-    mrrDag = new DAGImpl(mrrDagId, appAttemptId, conf, mrrDagPlan,
+    mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
         dispatcher.getEventHandler(),  taskAttemptListener,
-        jobTokenSecretManager, fsTokens, clock, "user", 10000, thh,
+        jobTokenSecretManager, fsTokens, clock, "user", thh,
         mrrAppContext);
     doReturn(mrrDag).when(mrrAppContext).getDAG();
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();