You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:44 UTC

[42/50] [abbrv] tez git commit: TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. (sseth)

TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3e03847c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3e03847c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3e03847c

Branch: refs/heads/TEZ-2003
Commit: 3e03847cadd5135f05992a86a202e2bff41fe76e
Parents: 2f8a273
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Apr 9 13:33:48 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../apache/tez/dag/api/TaskCommunicator.java    |  20 +++
 .../tez/dag/api/TaskCommunicatorContext.java    |  14 ++-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  52 +++-----
 .../dag/app/TaskCommunicatorContextImpl.java    | 124 +++++++++++++++++++
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |   6 +
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   2 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   5 +
 8 files changed, 188 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9d6b220..ca5225e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -15,5 +15,6 @@ ALL CHANGES:
   TEZ-2283. Fixes after rebase 04/07.
   TEZ-2284. Separate TaskReporter into an interface.
   TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
+  TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 945091e..a2cd858 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 
@@ -54,4 +55,23 @@ public abstract class TaskCommunicator extends AbstractService {
   public abstract InetSocketAddress getAddress();
 
   // TODO Eventually. Add methods here to support preemption of tasks.
+
+  /**
+   * Receive notifications on vertex state changes.
+   * <p/>
+   * State changes will be received based on the registration via {@link
+   * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
+   * java.util.Set)}. Notifications will be received for all registered state changes, and not just
+   * for the latest state update. They will be in order in which the state change occurred. </p>
+   *
+   * Extensive processing should not be performed via this method call. Instead this should just be
+   * used as a notification mechanism.
+   * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator and
+   * multi-threading/concurrency implications must be considered.
+   * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
+   *                    Additional information may be available for specific events, Look at the
+   *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+   * @throws Exception
+   */
+  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 0c3bac3..19caed9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -16,10 +16,12 @@ package org.apache.tez.dag.api;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.Set;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 
@@ -48,7 +50,7 @@ public interface TaskCommunicatorContext {
   void containerAlive(ContainerId containerId);
 
   // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
-  void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
+  void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
 
   // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
   void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
@@ -56,6 +58,16 @@ public interface TaskCommunicatorContext {
   // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
   void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
 
+  /**
+   * Register to get notifications on updates to the specified vertex. Notifications will be sent
+   * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} </p>
+   *
+   * This method can only be invoked once. Duplicate invocations will result in an error.
+   *
+   * @param vertexName the vertex name for which notifications are required.
+   * @param stateSet   the set of states for which notifications are required. null implies all
+   */
+  void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
   // TODO TEZ-2003 API. Should a method exist for task succeeded.
 
   // TODO Eventually Add methods to report availability stats to the scheduler.

http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index a6ccbfa..a7bbba9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -17,6 +17,7 @@
 
 package org.apache.tez.dag.app;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -26,6 +27,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -35,13 +37,13 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
@@ -67,14 +69,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 @SuppressWarnings("unchecked")
 @InterfaceAudience.Private
 public class TaskAttemptListenerImpTezDag extends AbstractService implements
-    TaskAttemptListener, TaskCommunicatorContext {
+    TaskAttemptListener {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(TaskAttemptListenerImpTezDag.class);
@@ -124,7 +124,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
     for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
-      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i]);
+      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
     }
     // TODO TEZ-2118 Start using taskCommunicator indices properly
   }
@@ -145,13 +145,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
   }
 
-  private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) {
+  private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
     if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Using Default Task Communicator");
-      return new TezTaskCommunicatorImpl(this);
+      return new TezTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
     } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Using Default Local Task Communicator");
-      return new TezLocalTaskCommunicatorImpl(this);
+      return new TezLocalTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
     } else {
       LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
       Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
@@ -159,7 +159,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       try {
         Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(this);
+        return ctor.newInstance(new TaskCommunicatorContextImpl(context, this, taskCommIndex));
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -171,18 +171,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       }
     }
   }
-
-  @Override
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return context.getApplicationAttemptId();
-  }
-
-  @Override
-  public Credentials getCredentials() {
-    return context.getAppCredentials();
-  }
-
-  @Override
   public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
       throws IOException, TezException {
     ContainerId containerId = ConverterUtils.toContainerId(request
@@ -252,30 +240,20 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     return new TaskHeartbeatResponse(false, outEvents);
   }
-
-  @Override
-  public boolean isKnownContainer(ContainerId containerId) {
-    return context.getAllContainers().get(containerId) != null;
-  }
-
-  @Override
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
     taskHeartbeatHandler.pinged(taskAttemptId);
   }
 
-  @Override
   public void containerAlive(ContainerId containerId) {
     pingContainerHeartbeatHandler(containerId);
   }
 
-  @Override
   public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
     context.getEventHandler()
         .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
     pingContainerHeartbeatHandler(containerId);
   }
 
-  @Override
   public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
                          String diagnostics) {
     // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
@@ -288,7 +266,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         taskAttemptEndReason)));
   }
 
-  @Override
   public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
                          String diagnostics) {
     // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
@@ -301,6 +278,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         taskAttemptEndReason)));
   }
 
+  public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
+      Exception {
+    taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+  }
+
 
   /**
    * Child checking whether it can commit.
@@ -310,7 +292,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
    * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
    * centralized commit protocol handling by the JobTracker.
    */
-  @Override
+//  @Override
   public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
     LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
     // An attempt is asking if it can commit its output. This can be decided

http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
new file mode 100644
index 0000000..3714c3c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+@InterfaceAudience.Private
+public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, VertexStateUpdateListener {
+
+
+  private final AppContext context;
+  private final TaskAttemptListenerImpTezDag taskAttemptListener;
+  private final int taskCommunicatorIndex;
+
+  public TaskCommunicatorContextImpl(AppContext appContext,
+                                     TaskAttemptListenerImpTezDag taskAttemptListener,
+                                     int taskCommunicatorIndex) {
+    this.context = appContext;
+    this.taskAttemptListener = taskAttemptListener;
+    this.taskCommunicatorIndex = taskCommunicatorIndex;
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return context.getApplicationAttemptId();
+  }
+
+  @Override
+  public Credentials getCredentials() {
+    return context.getAppCredentials();
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+    return taskAttemptListener.canCommit(taskAttemptId);
+  }
+
+  @Override
+  public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException,
+      TezException {
+    return taskAttemptListener.heartbeat(request);
+  }
+
+  @Override
+  public boolean isKnownContainer(ContainerId containerId) {
+    return context.getAllContainers().get(containerId) != null;
+  }
+
+  @Override
+  public void taskAlive(TezTaskAttemptID taskAttemptId) {
+    taskAttemptListener.taskAlive(taskAttemptId);
+  }
+
+  @Override
+  public void containerAlive(ContainerId containerId) {
+    taskAttemptListener.containerAlive(containerId);
+  }
+
+  @Override
+  public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
+    taskAttemptListener.taskStartedRemotely(taskAttemptId, containerId);
+  }
+
+  @Override
+  public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         @Nullable String diagnostics) {
+    taskAttemptListener.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics);
+  }
+
+  @Override
+  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         @Nullable String diagnostics) {
+    taskAttemptListener.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics);
+
+  }
+
+  @Override
+  public void registerForVertexStateUpdates(String vertexName,
+                                            @Nullable Set<VertexState> stateSet) {
+    Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
+    context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this);
+  }
+
+
+  @Override
+  public void onStateUpdated(VertexStateUpdate event) {
+    try {
+      taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
+    } catch (Exception e) {
+      // TODO TEZ-2003 This needs to be propagated to the DAG as a user error.
+      throw new TezUncheckedException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index ef4f764..1417a3b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -252,6 +253,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     return address;
   }
 
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+    // Empty. Not registering, or expecting any updates.
+  }
+
   protected String getTokenIdentifier() {
     return tokenIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 4c3426a..6d6872b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -94,4 +94,6 @@ public interface DAG {
 
   Map<String, TezVertexID> getVertexNameIDMapping();
 
+  StateChangeNotifier getStateChangeNotifier();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/3e03847c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f769565..998108b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -685,6 +685,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public StateChangeNotifier getStateChangeNotifier() {
+    return entityUpdateTracker;
+  }
+
+  @Override
   public TezCounters getAllCounters() {
 
     readLock.lock();