You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/12/16 03:54:15 UTC
tez git commit: TEZ-1789. Move speculator processing off the central
dispatcher (bikas)
Repository: tez
Updated Branches:
refs/heads/master f3a84cbad -> fbb3c17c7
TEZ-1789. Move speculator processing off the central dispatcher (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fbb3c17c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fbb3c17c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fbb3c17c
Branch: refs/heads/master
Commit: fbb3c17c708cef2a19719e34d7c288cee06f226c
Parents: f3a84cb
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Dec 15 18:54:00 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Dec 15 18:54:00 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 30 +++++++++-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 3 +
.../tez/dag/app/dag/event/SpeculatorEvent.java | 36 ++++++++++++
.../SpeculatorEventTaskAttemptStatusUpdate.java | 60 ++++++++++++++++++++
.../dag/app/dag/event/SpeculatorEventType.java | 29 ++++++++++
.../VertexEventTaskAttemptStatusUpdate.java | 60 --------------------
.../tez/dag/app/dag/event/VertexEventType.java | 5 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 10 ++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 47 ++++++---------
.../speculation/legacy/LegacySpeculator.java | 15 ++++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 10 ++--
tez-ui/src/main/webapp/bower.json | 1 -
13 files changed, 197 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b92f9b..18e00c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1789. Move speculator processing off the central dispatcher.
TEZ-1610. Add additional task counters for fetchers, merger.
TEZ-1775. Allow setting log level per logger.
TEZ-1847. Fix package name for MiniTezClusterWithTimeline.
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 35483a6..0699529 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -121,6 +121,8 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -149,6 +151,7 @@ import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.Graph;
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -207,6 +210,7 @@ public class DAGAppMaster extends AbstractService {
private AppContext context;
private Configuration amConf;
private Dispatcher dispatcher;
+ private Dispatcher speculatorDispatcher;
private ContainerLauncher containerLauncher;
private ContainerHeartbeatHandler containerHeartbeatHandler;
private TaskHeartbeatHandler taskHeartbeatHandler;
@@ -399,8 +403,12 @@ public class DAGAppMaster extends AbstractService {
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
- dispatcher.register(TaskAttemptEventType.class,
- new TaskAttemptEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+
+ // register other delegating dispatchers
+ this.speculatorDispatcher = createSpeculatorEventDispatcher();
+ addIfService(speculatorDispatcher, true);
+ dispatcher.register(SpeculatorEventType.class, speculatorDispatcher.getEventHandler());
this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
@@ -1704,6 +1712,24 @@ public class DAGAppMaster extends AbstractService {
((EventHandler<TaskEvent>)task).handle(event);
}
}
+
+ AsyncDispatcher createSpeculatorEventDispatcher() {
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.register(SpeculatorEventType.class,
+ new EventHandler<SpeculatorEvent>() {
+ @Override
+ public void handle(SpeculatorEvent event) {
+ DAG dag = context.getCurrentDAG();
+ TezVertexID vertexId = event.getVertexId();
+ Vertex v = dag.getVertex(vertexId);
+ Preconditions.checkState(v != null,
+ "Unknown vertex: " + vertexId + " for DAG: " + dag.getID());
+ v.handleSpeculatorEvent(event);
+ }
+ }
+ );
+ return dispatcher;
+ }
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 7487fd9..74b4080 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.history.HistoryEvent;
@@ -126,6 +127,8 @@ public interface Vertex extends Comparable<Vertex> {
void scheduleTasks(List<TaskWithLocationHint> tasks);
void scheduleSpeculativeTask(TezTaskID taskId);
Resource getTaskResource();
+
+ void handleSpeculatorEvent(SpeculatorEvent event);
ProcessorDescriptor getProcessorDescriptor();
public DAG getDAG();
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
new file mode 100644
index 0000000..16fab8e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class SpeculatorEvent extends AbstractEvent<SpeculatorEventType> {
+ private final TezVertexID vertexId;
+
+ public SpeculatorEvent(SpeculatorEventType type, TezVertexID vertexId) {
+ super(type);
+ this.vertexId = vertexId;
+ }
+
+ public TezVertexID getVertexId() {
+ return vertexId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java
new file mode 100644
index 0000000..d5745c4
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventTaskAttemptStatusUpdate.java
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class SpeculatorEventTaskAttemptStatusUpdate extends SpeculatorEvent {
+ final TezTaskAttemptID id;
+ final TaskAttemptState state;
+ final long timestamp;
+ final boolean justStarted;
+
+ public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+ long timestamp) {
+ this(taId, state, timestamp, false);
+ }
+
+ public SpeculatorEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+ long timestamp, boolean justStarted) {
+ super(SpeculatorEventType.S_TASK_ATTEMPT_STATUS_UPDATE, taId.getTaskID().getVertexID());
+ this.id = taId;
+ this.state = state;
+ this.timestamp = timestamp;
+ this.justStarted = justStarted;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public TezTaskAttemptID getAttemptId() {
+ return id;
+ }
+
+ public boolean hasJustStarted() {
+ return justStarted;
+ }
+
+ public TaskAttemptState getTaskAttemptState() {
+ return state;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java
new file mode 100644
index 0000000..f3b5c10
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEventType.java
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+/**
+ * Event types handled by Speculator.
+ */
+public enum SpeculatorEventType {
+
+ //Producer:TaskAttempt
+ S_TASK_ATTEMPT_STATUS_UPDATE,
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
deleted file mode 100644
index 696680d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class VertexEventTaskAttemptStatusUpdate extends VertexEvent {
- final TezTaskAttemptID id;
- final TaskAttemptState state;
- final long timestamp;
- final boolean justStarted;
-
- public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
- long timestamp) {
- this(taId, state, timestamp, false);
- }
-
- public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
- long timestamp, boolean justStarted) {
- super(taId.getTaskID().getVertexID(), VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE);
- this.id = taId;
- this.state = state;
- this.timestamp = timestamp;
- this.justStarted = justStarted;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public TezTaskAttemptID getAttemptId() {
- return id;
- }
-
- public boolean hasJustStarted() {
- return justStarted;
- }
-
- public TaskAttemptState getTaskAttemptState() {
- return state;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 5565f93..1d0222e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -19,7 +19,7 @@
package org.apache.tez.dag.app.dag.event;
/**
- * Event types handled by Task.
+ * Event types handled by Vertex.
*/
public enum VertexEventType {
@@ -40,9 +40,6 @@ public enum VertexEventType {
V_TASK_RESCHEDULED,
V_TASK_ATTEMPT_COMPLETED,
- //Producer:TaskAttempt
- V_TASK_ATTEMPT_STATUS_UPDATE,
-
//Producer:Any component
V_INTERNAL_ERROR,
V_MANAGER_USER_CODE_ERROR,
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 007774f..1e6ed22 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -85,7 +85,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -1169,7 +1169,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskEventType.T_ATTEMPT_LAUNCHED));
if (ta.isSpeculationEnabled()) {
- ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
+ ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
ta.launchTime, true));
}
@@ -1262,7 +1262,7 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.updateProgressSplits();
if (ta.isSpeculationEnabled()) {
- ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(),
+ ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(),
ta.clock.getTime()));
}
}
@@ -1294,7 +1294,7 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.reportedStatus.progress = 1.0f;
if (ta.isSpeculationEnabled()) {
- ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED,
+ ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED,
ta.clock.getTime()));
}
@@ -1318,7 +1318,7 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.taskHeartbeatHandler.unregister(ta.attemptId);
ta.reportedStatus.state = helper.getTaskAttemptState(); // FAILED or KILLED
if (ta.isSpeculationEnabled()) {
- ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(),
+ ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(),
ta.clock.getTime()));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 57f128b..13993cc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -104,6 +104,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -124,7 +125,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
@@ -243,8 +243,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private static final TaskAttemptCompletedEventTransition
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
- private static final TaskAttempStatusUpdateEventTransition
- TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION = new TaskAttempStatusUpdateEventTransition();
private static final SourceTaskAttemptCompletedEventTransition
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new SourceTaskAttemptCompletedEventTransition();
@@ -472,10 +470,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING),
VertexEventType.V_ROUTE_EVENT,
ROUTE_EVENT_TRANSITION)
- .addTransition(
- VertexState.RUNNING,
- VertexState.RUNNING, VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
- TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION)
// Transitions from TERMINATING state.
.addTransition
@@ -493,7 +487,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
- VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_ROUTE_EVENT,
@@ -527,7 +520,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
- VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
// after we are done reruns of source tasks should not affect
// us. These reruns may be triggered by other consumer vertices.
// We should have been in RUNNING state if we had triggered the
@@ -553,7 +545,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_START,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
- VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -578,7 +569,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
- VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED,
@@ -598,7 +588,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TERMINATE,
VertexEventType.V_MANAGER_USER_CODE_ERROR,
- VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
@@ -1257,8 +1246,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void scheduleSpeculativeTask(TezTaskID taskId) {
- Preconditions.checkState(taskId.getId() < numTasks);
- eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
+ readLock.lock();
+ try {
+ Preconditions.checkState(taskId.getId() < numTasks);
+ eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -3598,23 +3592,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
- private static class TaskAttempStatusUpdateEventTransition implements
- SingleArcTransition<VertexImpl, VertexEvent> {
- @Override
- public void transition(VertexImpl vertex, VertexEvent event) {
- VertexEventTaskAttemptStatusUpdate updateEvent =
- ((VertexEventTaskAttemptStatusUpdate) event);
- if (vertex.isSpeculationEnabled()) {
- if (updateEvent.hasJustStarted()) {
- vertex.speculator.notifyAttemptStarted(updateEvent.getAttemptId(),
- updateEvent.getTimestamp());
- } else {
- vertex.speculator.notifyAttemptStatusUpdate(updateEvent.getAttemptId(),
- updateEvent.getTaskAttemptState(), updateEvent.getTimestamp());
- }
- }
- }
- }
private static class TaskCompletedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -4091,6 +4068,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.rootInputSpecs.put(input.getName(), DEFAULT_ROOT_INPUT_SPECS);
}
}
+
+ // not taking a lock by design. Speculator callbacks to the vertex will take locks if needed
+ @Override
+ public void handleSpeculatorEvent(SpeculatorEvent event) {
+ if (isSpeculationEnabled()) {
+ speculator.handle(event);
+ }
+ }
@Nullable
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index 8f76e05..2cc3165 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -34,6 +34,8 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -180,6 +182,16 @@ public class LegacySpeculator {
}
}
}
+
+ public void handle(SpeculatorEvent event) {
+ SpeculatorEventTaskAttemptStatusUpdate updateEvent = ((SpeculatorEventTaskAttemptStatusUpdate) event);
+ if (updateEvent.hasJustStarted()) {
+ notifyAttemptStarted(updateEvent.getAttemptId(), updateEvent.getTimestamp());
+ } else {
+ notifyAttemptStatusUpdate(updateEvent.getAttemptId(), updateEvent.getTaskAttemptState(),
+ updateEvent.getTimestamp());
+ }
+ }
/* ************************************************************* */
@@ -296,8 +308,7 @@ public class LegacySpeculator {
//Add attempt to a given Task.
protected void addSpeculativeAttempt(TezTaskID taskID) {
- LOG.info
- ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+ LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
vertex.scheduleSpeculativeTask(taskID);
mayHaveSpeculated.add(taskID);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 29469b1..07e54fe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
+import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -612,7 +612,7 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
verifyEventType(
arg.getAllValues().subList(0,
- expectedEventsAtRunning), VertexEventTaskAttemptStatusUpdate.class, 1);
+ expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
@@ -651,7 +651,7 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
- expectedEvenstAfterTerminating), VertexEventTaskAttemptStatusUpdate.class, 2);
+ expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
}
@Test//(timeout = 5000)
@@ -712,7 +712,7 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
verifyEventType(
arg.getAllValues().subList(0,
- expectedEventsAtRunning), VertexEventTaskAttemptStatusUpdate.class, 1);
+ expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
@@ -740,7 +740,7 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
- expectedEvenstAfterTerminating), VertexEventTaskAttemptStatusUpdate.class, 2);
+ expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
}
@Test(timeout = 5000)
http://git-wip-us.apache.org/repos/asf/tez/blob/fbb3c17c/tez-ui/src/main/webapp/bower.json
----------------------------------------------------------------------
diff --git a/tez-ui/src/main/webapp/bower.json b/tez-ui/src/main/webapp/bower.json
index 18cae96..9c19c78 100644
--- a/tez-ui/src/main/webapp/bower.json
+++ b/tez-ui/src/main/webapp/bower.json
@@ -24,7 +24,6 @@
"resolutions": {
"handlebars": "~1.3.0",
"jquery-ui": ">=1.11",
- "jquery-mousewheel": "~3.1.12",
"antiscroll": "fa3f81d3c0",
"ember": "1.7.0"
}