You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/12/16 03:54:15 UTC

tez git commit: TEZ-1789. Move speculator processing off the central dispatcher (bikas)

Repository: tez
Updated Branches:
  refs/heads/master f3a84cbad -> fbb3c17c7


TEZ-1789. Move speculator processing off the central dispatcher (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fbb3c17c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fbb3c17c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fbb3c17c

Branch: refs/heads/master
Commit: fbb3c17c708cef2a19719e34d7c288cee06f226c
Parents: f3a84cb
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Dec 15 18:54:00 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Dec 15 18:54:00 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 30 +++++++++-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  3 +
 .../tez/dag/app/dag/event/SpeculatorEvent.java  | 36 ++++++++++++
 .../SpeculatorEventTaskAttemptStatusUpdate.java | 60 ++++++++++++++++++++
 .../dag/app/dag/event/SpeculatorEventType.java  | 29 ++++++++++
 .../VertexEventTaskAttemptStatusUpdate.java     | 60 --------------------
 .../tez/dag/app/dag/event/VertexEventType.java  |  5 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 10 ++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 47 ++++++---------
 .../speculation/legacy/LegacySpeculator.java    | 15 ++++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 10 ++--
 tez-ui/src/main/webapp/bower.json               |  1 -
 13 files changed, 197 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b92f9b..18e00c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1789. Move speculator processing off the central dispatcher.
   TEZ-1610. Add additional task counters for fetchers, merger.
   TEZ-1775. Allow setting log level per logger.
   TEZ-1847. Fix package name for MiniTezClusterWithTimeline.

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 35483a6..0699529 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -121,6 +121,8 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -149,6 +151,7 @@ import org.apache.tez.dag.history.events.AppLaunchedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.Graph;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -207,6 +210,7 @@ public class DAGAppMaster extends AbstractService {
   private AppContext context;
   private Configuration amConf;
   private Dispatcher dispatcher;
+  private Dispatcher speculatorDispatcher;
   private ContainerLauncher containerLauncher;
   private ContainerHeartbeatHandler containerHeartbeatHandler;
   private TaskHeartbeatHandler taskHeartbeatHandler;
@@ -399,8 +403,12 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-    dispatcher.register(TaskAttemptEventType.class,
-        new TaskAttemptEventDispatcher());
+    dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+    
+    // register other delegating dispatchers
+    this.speculatorDispatcher = createSpeculatorEventDispatcher();
+    addIfService(speculatorDispatcher, true);
+    dispatcher.register(SpeculatorEventType.class, speculatorDispatcher.getEventHandler());
 
     this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
@@ -1704,6 +1712,24 @@ public class DAGAppMaster extends AbstractService {
       ((EventHandler<TaskEvent>)task).handle(event);
     }
   }
+  
+  AsyncDispatcher createSpeculatorEventDispatcher() {
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.register(SpeculatorEventType.class, 
+        new EventHandler<SpeculatorEvent>() {
+          @Override
+          public void handle(SpeculatorEvent event) {
+            DAG dag = context.getCurrentDAG();
+            TezVertexID vertexId = event.getVertexId();
+            Vertex v = dag.getVertex(vertexId);
+            Preconditions.checkState(v != null,
+                "Unknown vertex: " + vertexId + " for DAG: " + dag.getID());
+            v.handleSpeculatorEvent(event);
+          }
+        }
+      );
+    return dispatcher;
+  }
 
   private class TaskAttemptEventDispatcher
           implements EventHandler<TaskAttemptEvent> {

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 7487fd9..74b4080 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -126,6 +127,8 @@ public interface Vertex extends Comparable<Vertex> {
   void scheduleTasks(List<TaskWithLocationHint> tasks);
   void scheduleSpeculativeTask(TezTaskID taskId);
   Resource getTaskResource();
+  
+  void handleSpeculatorEvent(SpeculatorEvent event);
 
   ProcessorDescriptor getProcessorDescriptor();
   public DAG getDAG();

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
new file mode 100644
index 0000000..16fab8e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class SpeculatorEvent extends AbstractEvent<SpeculatorEventType> {
+  private final TezVertexID vertexId;
+  
+  public SpeculatorEvent(SpeculatorEventType type, TezVertexID vertexId) {
+    super(type);
+    this.vertexId = vertexId;
+  }
+  
+  public TezVertexID getVertexId() {
+    return vertexId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java
new file mode 100644
index 0000000..d5745c4
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class SpeculatorEventTaskAttemptStatusUpdate extends SpeculatorEvent {
+  final TezTaskAttemptID id;
+  final TaskAttemptState state;
+  final long timestamp;
+  final boolean justStarted;
+  
+  public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+      long timestamp) {
+    this(taId, state, timestamp, false);
+  }
+  
+  public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+      long timestamp, boolean justStarted) {
+    super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getTaskID().getVertexID());
+    this.id = taId;
+    this.state = state;
+    this.timestamp = timestamp;
+    this.justStarted = justStarted;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+  
+  public TezTaskAttemptID getAttemptId() {
+    return id;
+  }
+  
+  public boolean hasJustStarted() {
+    return justStarted;
+  }
+  
+  public TaskAttemptState getTaskAttemptState() {
+    return state;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java
new file mode 100644
index 0000000..f3b5c10
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+/**
+ * Event types handled by Speculator.
+ */
+public enum SpeculatorEventType {
+
+  //Producer:TaskAttempt
+  S_TASK_ATTEMPT_STATUS_UPDATE,
+  
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
deleted file mode 100644
index 696680d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class VertexEventTaskAttemptStatusUpdate extends VertexEvent {
-  final TezTaskAttemptID id;
-  final TaskAttemptState state;
-  final long timestamp;
-  final boolean justStarted;
-  
-  public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
-      long timestamp) {
-    this(taId, state, timestamp, false);
-  }
-  
-  public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
-      long timestamp, boolean justStarted) {
-    super(taId.getTaskID().getVertexID(), VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE);
-    this.id = taId;
-    this.state = state;
-    this.timestamp = timestamp;
-    this.justStarted = justStarted;
-  }
-  
-  public long getTimestamp() {
-    return timestamp;
-  }
-  
-  public TezTaskAttemptID getAttemptId() {
-    return id;
-  }
-  
-  public boolean hasJustStarted() {
-    return justStarted;
-  }
-  
-  public TaskAttemptState getTaskAttemptState() {
-    return state;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 5565f93..1d0222e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -19,7 +19,7 @@
 package org.apache.tez.dag.app.dag.event;
 
 /**
- * Event types handled by Task.
+ * Event types handled by Vertex.
  */
 public enum VertexEventType {
 
@@ -40,9 +40,6 @@ public enum VertexEventType {
   V_TASK_RESCHEDULED,
   V_TASK_ATTEMPT_COMPLETED,
   
-  //Producer:TaskAttempt
-  V_TASK_ATTEMPT_STATUS_UPDATE,
-  
   //Producer:Any component
   V_INTERNAL_ERROR,
   V_MANAGER_USER_CODE_ERROR,

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 007774f..1e6ed22 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -85,7 +85,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -1169,7 +1169,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskEventType.T_ATTEMPT_LAUNCHED));
       
       if (ta.isSpeculationEnabled()) {
-        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
+        ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
             ta.launchTime, true));
       }
 
@@ -1262,7 +1262,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.updateProgressSplits();
 
       if (ta.isSpeculationEnabled()) {
-        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(),
+        ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(),
             ta.clock.getTime()));
       }
     }
@@ -1294,7 +1294,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.reportedStatus.progress = 1.0f;
       
       if (ta.isSpeculationEnabled()) {
-        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED,
+        ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED,
             ta.clock.getTime()));
       }
 
@@ -1318,7 +1318,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.taskHeartbeatHandler.unregister(ta.attemptId);
       ta.reportedStatus.state = helper.getTaskAttemptState(); // FAILED or KILLED
       if (ta.isSpeculationEnabled()) {
-        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(),
+        ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(),
             ta.clock.getTime()));
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 57f128b..13993cc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -104,6 +104,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -124,7 +125,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
@@ -243,8 +243,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private static final TaskAttemptCompletedEventTransition
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new TaskAttemptCompletedEventTransition();
-  private static final TaskAttempStatusUpdateEventTransition
-      TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION = new TaskAttempStatusUpdateEventTransition();
   private static final SourceTaskAttemptCompletedEventTransition
       SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new SourceTaskAttemptCompletedEventTransition();
@@ -472,10 +470,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING),
               VertexEventType.V_ROUTE_EVENT,
               ROUTE_EVENT_TRANSITION)
-          .addTransition(
-              VertexState.RUNNING,
-              VertexState.RUNNING, VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
-              TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION)
 
           // Transitions from TERMINATING state.
           .addTransition
@@ -493,7 +487,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
-                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
                   VertexEventType.V_ROUTE_EVENT,
@@ -527,7 +520,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   // after we are done reruns of source tasks should not affect
                   // us. These reruns may be triggered by other consumer vertices.
                   // We should have been in RUNNING state if we had triggered the
@@ -553,7 +545,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_START,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -578,7 +569,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
@@ -598,7 +588,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TERMINATE,
                   VertexEventType.V_MANAGER_USER_CODE_ERROR,
-                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
@@ -1257,8 +1246,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public void scheduleSpeculativeTask(TezTaskID taskId) {
-    Preconditions.checkState(taskId.getId() < numTasks);
-    eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
+    readLock.lock();
+    try {
+      Preconditions.checkState(taskId.getId() < numTasks);
+      eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
+    } finally {
+      readLock.unlock();
+    }
   }
   
   @Override
@@ -3598,23 +3592,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  private static class TaskAttempStatusUpdateEventTransition implements
-      SingleArcTransition<VertexImpl, VertexEvent> {
-    @Override
-    public void transition(VertexImpl vertex, VertexEvent event) {
-      VertexEventTaskAttemptStatusUpdate updateEvent =
-        ((VertexEventTaskAttemptStatusUpdate) event);
-      if (vertex.isSpeculationEnabled()) {
-        if (updateEvent.hasJustStarted()) {
-          vertex.speculator.notifyAttemptStarted(updateEvent.getAttemptId(),
-              updateEvent.getTimestamp());
-        } else {
-          vertex.speculator.notifyAttemptStatusUpdate(updateEvent.getAttemptId(),
-              updateEvent.getTaskAttemptState(), updateEvent.getTimestamp());
-        }
-      }
-    }
-  }
   private static class TaskCompletedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -4091,6 +4068,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       this.rootInputSpecs.put(input.getName(), DEFAULT_ROOT_INPUT_SPECS);
     }
   }
+  
+  // not taking a lock by design. Speculator callbacks to the vertex will take locks if needed
+  @Override
+  public void handleSpeculatorEvent(SpeculatorEvent event) {
+    if (isSpeculationEnabled()) {
+      speculator.handle(event);
+    }
+  }
 
   @Nullable
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index 8f76e05..2cc3165 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -34,6 +34,8 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 
@@ -180,6 +182,16 @@ public class LegacySpeculator {
       }
     }
   }
+  
+  public void handle(SpeculatorEvent event) {
+    SpeculatorEventTaskAttemptStatusUpdate updateEvent = ((SpeculatorEventTaskAttemptStatusUpdate) event);
+    if (updateEvent.hasJustStarted()) {
+      notifyAttemptStarted(updateEvent.getAttemptId(), updateEvent.getTimestamp());
+    } else {
+      notifyAttemptStatusUpdate(updateEvent.getAttemptId(), updateEvent.getTaskAttemptState(),
+          updateEvent.getTimestamp());
+    }
+  }
 
 /*   *************************************************************    */
 
@@ -296,8 +308,7 @@ public class LegacySpeculator {
 
   //Add attempt to a given Task.
   protected void addSpeculativeAttempt(TezTaskID taskID) {
-    LOG.info
-        ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+    LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
     vertex.scheduleSpeculativeTask(taskID);
     mayHaveSpeculated.add(taskID);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 29469b1..07e54fe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -612,7 +612,7 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
     verifyEventType(
         arg.getAllValues().subList(0,
-            expectedEventsAtRunning), VertexEventTaskAttemptStatusUpdate.class, 1);
+            expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
     
     taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
     
@@ -651,7 +651,7 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), VertexEventTaskAttemptStatusUpdate.class, 2);
+            expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
   }
   
   @Test//(timeout = 5000)
@@ -712,7 +712,7 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
     verifyEventType(
         arg.getAllValues().subList(0,
-            expectedEventsAtRunning), VertexEventTaskAttemptStatusUpdate.class, 1);
+            expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
     
     taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
     
@@ -740,7 +740,7 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), VertexEventTaskAttemptStatusUpdate.class, 2);
+            expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
   }
   
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-ui/src/main/webapp/bower.json
----------------------------------------------------------------------
diff --git a/tez-ui/src/main/webapp/bower.json b/tez-ui/src/main/webapp/bower.json
index 18cae96..9c19c78 100644
--- a/tez-ui/src/main/webapp/bower.json
+++ b/tez-ui/src/main/webapp/bower.json
@@ -24,7 +24,6 @@
   "resolutions": {
     "handlebars": "~1.3.0",
     "jquery-ui": ">=1.11",
-    "jquery-mousewheel": "~3.1.12",
     "antiscroll": "fa3f81d3c0",
     "ember": "1.7.0"
   }