You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/11 07:52:18 UTC
tez git commit: TEZ-2421. Deadlock in AM because attempt and vertex
locking each other out (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master ce69aa1e2 -> ed7f1abbc
TEZ-2421. Deadlock in AM because attempt and vertex locking each other out (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ed7f1abb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ed7f1abb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ed7f1abb
Branch: refs/heads/master
Commit: ed7f1abbce54093f56f33c35c8ac92d9e433760f
Parents: ce69aa1
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon May 11 13:51:59 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon May 11 13:51:59 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/dag/app/dag/Task.java | 7 +-
.../app/dag/event/TaskEventScheduleTask.java | 42 ++++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 25 ++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 29 ++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 206 +++++++++++--------
.../tez/dag/app/dag/impl/TestDAGImpl.java | 14 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 79 +++----
.../tez/dag/app/dag/impl/TestTaskImpl.java | 26 +--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 27 +++
.../apache/tez/runtime/api/impl/TaskSpec.java | 29 +++
11 files changed, 327 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index efb19b2..b85a8fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
Default max limit increased. Should not affect existing users.
ALL CHANGES:
+ TEZ-2421. Deadlock in AM because attempt and vertex locking each other out
TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting
TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index b798fce..177ee8a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -22,11 +22,13 @@ import java.util.List;
import java.util.Map;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskReport;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
/**
@@ -65,5 +67,8 @@ public interface Task {
TaskState restoreFromEvent(HistoryEvent historyEvent);
public void registerTezEvent(TezEvent tezEvent);
-
+
+ public TaskSpec getBaseTaskSpec();
+
+ public TaskLocationHint getTaskLocationHint();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
new file mode 100644
index 0000000..696602a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class TaskEventScheduleTask extends TaskEvent {
+ private final TaskSpec baseTaskSpec;
+ private final TaskLocationHint locationHint;
+
+ public TaskEventScheduleTask(TezTaskID taskId, TaskSpec baseTaskSpec, TaskLocationHint locationHint) {
+ super(taskId, TaskEventType.T_SCHEDULE);
+ this.baseTaskSpec = baseTaskSpec;
+ this.locationHint = locationHint;
+ }
+
+ public TaskSpec getBaseTaskSpec() {
+ return baseTaskSpec;
+ }
+
+ public TaskLocationHint getTaskLocationHint() {
+ return locationHint;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index b1c0acc..036022e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -456,14 +455,19 @@ public class TaskAttemptImpl implements TaskAttempt,
}
TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
- Vertex vertex = getVertex();
- ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
- int taskId = getTaskID().getId();
+ TaskSpec baseTaskSpec = task.getBaseTaskSpec();
+ if (baseTaskSpec == null) {
+ // since recovery does not follow normal transitions, TaskEventScheduleTask
+ // is not being honored by the recovery code path. Using this to workaround
+ // until recovery is fixed. Calling the non-locking internal method of the vertex
+ // to get the taskSpec directly. Since everything happens on the central dispatcher
+ // during recovery this is deadlock free for now. TEZ-1019 should remove the need for this.
+ baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId());
+ }
return new TaskSpec(getID(),
- vertex.getDAG().getName(),
- vertex.getName(), vertex.getTotalTasks(), procDesc,
- vertex.getInputSpecList(taskId), vertex.getOutputSpecList(taskId),
- vertex.getGroupInputSpecList(taskId));
+ baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+ baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
+ baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
}
@Override
@@ -935,9 +939,8 @@ public class TaskAttemptImpl implements TaskAttempt,
// sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
}
- @VisibleForTesting
- protected TaskLocationHint getTaskLocationHint() {
- return getVertex().getTaskLocationHint(getTaskID());
+ private TaskLocationHint getTaskLocationHint() {
+ return task.getTaskLocationHint();
}
protected String[] resolveHosts(String[] src) {
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2e884e7..de5ab2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -37,7 +37,6 @@ import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -73,6 +73,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
@@ -92,6 +93,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -128,6 +130,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
//private final MRAppMetrics metrics;
protected final AppContext appContext;
private final Resource taskResource;
+ private TaskSpec baseTaskSpec;
+ private TaskLocationHint locationHint;
private final ContainerContext containerContext;
@VisibleForTesting
long scheduledTime;
@@ -516,6 +520,26 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
readLock.unlock();
}
}
+
+ @Override
+ public TaskSpec getBaseTaskSpec() {
+ readLock.lock();
+ try {
+ return baseTaskSpec;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskLocationHint getTaskLocationHint() {
+ readLock.lock();
+ try {
+ return locationHint;
+ } finally {
+ readLock.unlock();
+ }
+ }
@Override
public List<String> getDiagnostics() {
@@ -1021,6 +1045,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
+ TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event;
+ task.locationHint = scheduleEvent.getTaskLocationHint();
+ task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
task.addAndScheduleAttempt();
task.scheduledTime = task.clock.getTime();
task.logJobHistoryTaskStartedEvent();
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6b208b0..80a0358 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -114,6 +114,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -171,6 +172,7 @@ import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -1417,66 +1419,96 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- void setupEdgeRouting() throws AMUserCodeException {
+ boolean setupEdgeRouting() throws AMUserCodeException {
+ boolean doOnDemand = useOnDemandRouting;
for (Edge e : sourceVertices.values()) {
boolean edgeDoingOnDemand = e.routingToBegin();
- if (useOnDemandRouting && !edgeDoingOnDemand) {
- useOnDemandRouting = false;
+ if (doOnDemand && !edgeDoingOnDemand) {
+ doOnDemand = false;
LOG.info("Not using ondemand routing because of edge between " + e.getSourceVertexName()
+ " and " + getLogIdentifier());
}
}
+ return doOnDemand;
}
private void unsetTasksNotYetScheduled() throws AMUserCodeException {
if (tasksNotYetScheduled) {
- setupEdgeRouting();
- tasksNotYetScheduled = false;
- // only now can we be sure of the edge manager type. so until now
- // we will accumulate pending tasks in case legacy routing gets used.
- // this is only needed to support mixed mode routing. Else for
- // on demand routing events can be directly added to taskEvents when
- // they arrive in handleRoutedEvents instead of first caching them in
- // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents
- // can be removed.
- if (!pendingTaskEvents.isEmpty()) {
- LOG.info("Routing pending task events for vertex: " + logIdentifier);
- try {
- handleRoutedTezEvents(pendingTaskEvents, false, true);
- } catch (AMUserCodeException e) {
- String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier;
- LOG.error(msg, e);
- addDiagnostic(msg + ", " + e.getMessage() + ", "
- + ExceptionUtils.getStackTrace(e.getCause()));
- eventHandler.handle(new VertexEventTermination(vertexId,
- VertexTerminationCause.AM_USERCODE_FAILURE));
- return;
+ boolean doOnDemand = setupEdgeRouting();
+ // change state under lock
+ writeLock.lock();
+ try {
+ useOnDemandRouting = doOnDemand;
+ tasksNotYetScheduled = false;
+ // only now can we be sure of the edge manager type. so until now
+ // we will accumulate pending tasks in case legacy routing gets used.
+ // this is only needed to support mixed mode routing. Else for
+ // on demand routing events can be directly added to taskEvents when
+ // they arrive in handleRoutedEvents instead of first caching them in
+ // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents
+ // can be removed.
+ if (!pendingTaskEvents.isEmpty()) {
+ LOG.info("Routing pending task events for vertex: " + logIdentifier);
+ try {
+ handleRoutedTezEvents(pendingTaskEvents, false, true);
+ } catch (AMUserCodeException e) {
+ String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier;
+ LOG.error(msg, e);
+ addDiagnostic(msg + ", " + e.getMessage() + ", "
+ + ExceptionUtils.getStackTrace(e.getCause()));
+ eventHandler.handle(new VertexEventTermination(vertexId,
+ VertexTerminationCause.AM_USERCODE_FAILURE));
+ return;
+ }
+ pendingTaskEvents.clear();
}
- pendingTaskEvents.clear();
+ } finally {
+ writeLock.unlock();
}
}
}
+ TaskSpec createRemoteTaskSpec(int taskIndex) throws AMUserCodeException {
+ return TaskSpec.createBaseTaskSpec(getDAG().getName(),
+ getName(), getTotalTasks(), getProcessorDescriptor(),
+ getInputSpecList(taskIndex), getOutputSpecList(taskIndex),
+ getGroupInputSpecList(taskIndex));
+ }
+
@Override
public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
- writeLock.lock();
try {
unsetTasksNotYetScheduled();
- for (TaskWithLocationHint task : tasksToSchedule) {
- if (numTasks <= task.getTaskIndex().intValue()) {
- throw new TezUncheckedException(
- "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier);
- }
- TaskLocationHint locationHint = task.getTaskLocationHint();
- if (locationHint != null) {
- if (taskLocationHints == null) {
- taskLocationHints = new TaskLocationHint[numTasks];
+ // update state under write lock
+ writeLock.lock();
+ try {
+ for (TaskWithLocationHint task : tasksToSchedule) {
+ if (numTasks <= task.getTaskIndex().intValue()) {
+ throw new TezUncheckedException(
+ "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier);
}
- taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
+ TaskLocationHint locationHint = task.getTaskLocationHint();
+ if (locationHint != null) {
+ if (taskLocationHints == null) {
+ taskLocationHints = new TaskLocationHint[numTasks];
+ }
+ taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ readLock.lock();
+ try {
+ for (TaskWithLocationHint task : tasksToSchedule) {
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue());
+ TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
+ eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
+ getTaskLocationHint(taskId)));
}
- eventHandler.handle(new TaskEvent(
- TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue()),
- TaskEventType.T_SCHEDULE));
+ } finally {
+ readLock.unlock();
}
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
@@ -1485,8 +1517,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e));
// throw an unchecked exception to stop the vertex manager that invoked this.
throw new TezUncheckedException(e);
- } finally {
- writeLock.unlock();
}
}
@@ -4632,50 +4662,58 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return taskLocationHints;
}
- // TODO Eventually remove synchronization.
@Override
- public synchronized List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
- List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
- + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
- if (rootInputDescriptors != null) {
- for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
- rootInputDescriptorEntry : rootInputDescriptors.entrySet()) {
- inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(),
- rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get(
- rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
+ public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
+ readLock.lock();
+ try {
+ List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
+ + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
+ if (rootInputDescriptors != null) {
+ for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ rootInputDescriptorEntry : rootInputDescriptors.entrySet()) {
+ inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(),
+ rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get(
+ rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
+ }
}
+ for(Vertex vertex : getInputVertices().keySet()) {
+ /**
+ * It is possible that setParallelism is in the middle of processing in target vertex with
+ * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
+ * get consistent view.
+ * Refer TEZ-2251
+ */
+ InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
+ // TODO DAGAM This should be based on the edge type.
+ inputSpecList.add(inputSpec);
+ }
+ return inputSpecList;
+ } finally {
+ readLock.unlock();
}
- for(Vertex vertex : getInputVertices().keySet()) {
- /**
- * It is possible that setParallelism is in the middle of processing in target vertex with
- * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
- * get consistent view.
- * Refer TEZ-2251
- */
- InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
- // TODO DAGAM This should be based on the edge type.
- inputSpecList.add(inputSpec);
- }
- return inputSpecList;
}
- // TODO Eventually remove synchronization.
@Override
- public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
- List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
- + this.additionalOutputSpecs.size());
- outputSpecList.addAll(additionalOutputSpecs);
- for(Vertex vertex : targetVertices.keySet()) {
- /**
- * It is possible that setParallelism (which could change numTasks) is in the middle of
- * processing in target vertex with its write lock. So we need to get outputspec by
- * acquiring read lock in target vertex to get consistent view.
- * Refer TEZ-2251
- */
- OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
- outputSpecList.add(outputSpec);
- }
- return outputSpecList;
+ public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
+ readLock.lock();
+ try {
+ List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
+ + this.additionalOutputSpecs.size());
+ outputSpecList.addAll(additionalOutputSpecs);
+ for(Vertex vertex : targetVertices.keySet()) {
+ /**
+ * It is possible that setParallelism (which could change numTasks) is in the middle of
+ * processing in target vertex with its write lock. So we need to get outputspec by
+ * acquiring read lock in target vertex to get consistent view.
+ * Refer TEZ-2251
+ */
+ OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
+ outputSpecList.add(outputSpec);
+ }
+ return outputSpecList;
+ } finally {
+ readLock.unlock();
+ }
}
private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws
@@ -4703,10 +4741,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
- //TODO Eventually remove synchronization.
@Override
- public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
- return groupInputSpecList;
+ public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
+ readLock.lock();
+ try {
+ return groupInputSpecList;
+ } finally {
+ readLock.unlock();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index d2aa2d0..fff95b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -93,7 +93,6 @@ import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
@@ -975,12 +974,7 @@ public class TestDAGImpl {
dispatcher.await();
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
- LOG.info(String.valueOf(v2.getTasks().size()));
- Task t1= v2.getTask(0);
- TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
-
- Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
- String diag = StringUtils.join(ta1.getDiagnostics(), ",");
+ String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name()));
}
@@ -998,11 +992,7 @@ public class TestDAGImpl {
Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState());
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
- Task t1= v1.getTask(0);
- TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
-
- Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
- String diag = StringUtils.join(ta1.getDiagnostics(), ",");
+ String diag = StringUtils.join(v1.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name()));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 60c4c88..86251cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -65,6 +65,7 @@ import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -100,6 +101,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -113,11 +115,19 @@ public class TestTaskAttempt {
return new FileStatus(1, false, 1, 1, 1, f);
}
}
+
+ Task mockTask;
+ TaskLocationHint locationHint;
@BeforeClass
public static void setup() {
MockDNSToSwitchMapping.initializeMockRackResolver();
}
+
+ @Before
+ public void setupTest() {
+ mockTask = mock(Task.class);
+ }
@Test(timeout = 5000)
public void testLocalityRequest() {
@@ -129,14 +139,14 @@ public class TestTaskAttempt {
hosts.add("host1");
hosts.add("host2");
hosts.add("host3");
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(hosts, null);
+ locationHint = TaskLocationHint.createTaskLocationHint(hosts, null);
TezTaskID taskID = TezTaskID.getInstance(
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
- locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+ false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
@@ -148,6 +158,8 @@ public class TestTaskAttempt {
fail("Second event not of type "
+ AMSchedulerEventTALaunchRequest.class.getName());
}
+
+ verify(mockTask, times(1)).getTaskLocationHint();
// TODO Move the Rack request check to the client after TEZ-125 is fixed.
Set<String> requestedRacks = taImpl.taskRacks;
assertEquals(1, requestedRacks.size());
@@ -169,12 +181,12 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
- null, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+ false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
- null, true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+ true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -224,7 +236,7 @@ public class TestTaskAttempt {
String hosts[] = new String[] { "127.0.0.1", "host2", "host3" };
Set<String> resolved = new TreeSet<String>(
Arrays.asList(new String[]{ "host1", "host2", "host3" }));
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new TreeSet<String>(Arrays.asList(hosts)), null);
TezTaskID taskID = TezTaskID.getInstance(
@@ -232,7 +244,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
- mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
+ mock(AppContext.class), false, Resource.newInstance(1024,
1), createFakeContainerContext(), false);
TaskAttemptImpl spyTa = spy(taImpl);
@@ -280,7 +292,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -289,7 +301,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false,
+ mock(TaskHeartbeatHandler.class), mockAppContext, false,
resource, createFakeContainerContext(), false);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
@@ -330,7 +342,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -353,7 +365,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -431,7 +443,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -454,7 +466,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
@@ -496,7 +508,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -519,7 +531,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -589,7 +601,7 @@ public class TestTaskAttempt {
taskConf.setBoolean("fs.file.impl.disable.cache", true);
taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -612,7 +624,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -720,7 +732,7 @@ public class TestTaskAttempt {
taskConf.setBoolean("fs.file.impl.disable.cache", true);
taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -743,7 +755,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -811,7 +823,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -834,7 +846,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
@@ -906,7 +918,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -929,7 +941,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
@@ -1009,7 +1021,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -1032,7 +1044,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), true);
TezTaskAttemptID taskAttemptID = taImpl.getID();
@@ -1109,7 +1121,7 @@ public class TestTaskAttempt {
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
taskConf.setBoolean("fs.file.impl.disable.cache", true);
- TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+ locationHint = TaskLocationHint.createTaskLocationHint(
new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
@@ -1132,7 +1144,7 @@ public class TestTaskAttempt {
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
- mockHeartbeatHandler, appCtx, locationHint, false,
+ mockHeartbeatHandler, appCtx, false,
resource, createFakeContainerContext(), false);
TezTaskAttemptID taskAttemptID = taImpl.getID();
@@ -1231,29 +1243,24 @@ public class TestTaskAttempt {
};
private class MockTaskAttemptImpl extends TaskAttemptImpl {
- TaskLocationHint locationHint;
-
+
public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
EventHandler eventHandler, TaskAttemptListener tal,
Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
- TaskLocationHint locationHint, boolean isRescheduled,
+ boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex) {
super(taskId, attemptNumber, eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
- isRescheduled, resource, containerContext, leafVertex, mock(TaskImpl.class));
- this.locationHint = locationHint;
+ isRescheduled, resource, containerContext, leafVertex, mockTask);
+ when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
}
+
Vertex mockVertex = mock(Vertex.class);
boolean inputFailedReported = false;
@Override
- public TaskLocationHint getTaskLocationHint() {
- return locationHint;
- }
-
- @Override
protected Vertex getVertex() {
return mockVertex;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 9da3fab..1ecabef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -56,7 +56,7 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
@@ -70,6 +70,7 @@ import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
import org.junit.Before;
@@ -105,6 +106,7 @@ public class TestTaskImpl {
private NodeId mockNodeId;
private MockTaskImpl mockTask;
+ private TaskSpec mockTaskSpec;
@SuppressWarnings("rawtypes")
class TestEventHandler implements EventHandler<Event> {
@@ -149,8 +151,9 @@ public class TestTaskImpl {
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskAttemptListener, clock,
- taskHeartbeatHandler, appContext, leafVertex, locationHint,
+ taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
+ mockTaskSpec = mock(TaskSpec.class);
}
private TezTaskID getNewTaskID() {
@@ -159,8 +162,10 @@ public class TestTaskImpl {
}
private void scheduleTaskAttempt(TezTaskID taskId) {
- mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
+ mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint));
assertTaskScheduledState();
+ assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
+ assertEquals(locationHint, mockTask.getTaskLocationHint());
}
private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) {
@@ -671,19 +676,17 @@ public class TestTaskImpl {
private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
private Vertex vertex;
- TaskLocationHint locationHint;
public MockTaskImpl(TezVertexID vertexId, int partition,
EventHandler eventHandler, Configuration conf,
TaskAttemptListener taskAttemptListener, Clock clock,
TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex,
- TaskLocationHint locationHint, Resource resource,
+ Resource resource,
ContainerContext containerContext, Vertex vertex) {
super(vertexId, partition, eventHandler, conf, taskAttemptListener,
clock, thh, appContext, leafVertex, resource,
containerContext, mock(StateChangeNotifier.class), vertex);
this.vertex = vertex;
- this.locationHint = locationHint;
}
@Override
@@ -691,7 +694,7 @@ public class TestTaskImpl {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
attemptNumber, eventHandler, taskAttemptListener,
conf, clock, taskHeartbeatHandler, appContext,
- locationHint, true, taskResource, containerContext);
+ true, taskResource, containerContext);
taskAttempts.add(attempt);
return attempt;
}
@@ -730,21 +733,14 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
- TaskLocationHint locationHint;
public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
- TaskLocationHint locationHint, boolean isRescheduled,
+ boolean isRescheduled,
Resource resource, ContainerContext containerContext) {
super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class));
- this.locationHint = locationHint;
- }
-
- @Override
- public TaskLocationHint getTaskLocationHint() {
- return locationHint;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a8eaca1..6c94465 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -131,6 +131,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -352,9 +353,11 @@ public class TestVertexImpl {
}
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+ List<TaskEvent> events = Lists.newArrayList();
@SuppressWarnings("unchecked")
@Override
public void handle(TaskEvent event) {
+ events.add(event);
VertexImpl vertex = vertexIdMap.get(event.getTaskID().getVertexID());
Task task = vertex.getTask(event.getTaskID());
if (task != null) {
@@ -2706,6 +2709,30 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
+ public void testVertexScheduleSendEvent() throws Exception {
+ VertexImpl v3 = vertices.get("vertex3");
+ v3.vertexReconfigurationPlanned();
+ initAllVertices(VertexState.INITED);
+ Assert.assertEquals(2, v3.getTotalTasks());
+ Map<TezTaskID, Task> tasks = v3.getTasks();
+ Assert.assertEquals(2, tasks.size());
+
+ VertexImpl v1 = vertices.get("vertex1");
+ startVertex(vertices.get("vertex2"));
+ startVertex(v1);
+ v3.reconfigureVertex(10, null, null);
+ checkTasks(v3, 10);
+ taskEventDispatcher.events.clear();
+ TaskLocationHint mockLocation = mock(TaskLocationHint.class);
+ v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), mockLocation)));
+ dispatcher.await();
+ Assert.assertEquals(1, taskEventDispatcher.events.size());
+ TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
+ Assert.assertEquals(mockLocation, event.getTaskLocationHint());
+ Assert.assertNotNull(event.getBaseTaskSpec());
+ }
+
+ @Test(timeout = 5000)
public void testVertexSetParallelismFailAfterSchedule() throws Exception {
VertexImpl v3 = vertices.get("vertex3");
v3.vertexReconfigurationPlanned();
http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index cce063f..4dc57e2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -45,6 +45,35 @@ public class TaskSpec implements Writable {
public TaskSpec() {
}
+
+ public static TaskSpec createBaseTaskSpec(String dagName, String vertexName,
+ int vertexParallelism, ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList) {
+ return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+ outputSpecList, groupInputSpecList);
+ }
+
+ public TaskSpec(
+ String dagName, String vertexName,
+ int vertexParallelism,
+ ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList) {
+ checkNotNull(dagName, "dagName is null");
+ checkNotNull(vertexName, "vertexName is null");
+ checkNotNull(processorDescriptor, "processorDescriptor is null");
+ checkNotNull(inputSpecList, "inputSpecList is null");
+ checkNotNull(outputSpecList, "outputSpecList is null");
+ this.taskAttemptId = null;
+ this.dagName = StringInterner.weakIntern(dagName);
+ this.vertexName = StringInterner.weakIntern(vertexName);
+ this.processorDescriptor = processorDescriptor;
+ this.inputSpecList = inputSpecList;
+ this.outputSpecList = outputSpecList;
+ this.groupInputSpecList = groupInputSpecList;
+ this.vertexParallelism = vertexParallelism;
+ }
public TaskSpec(TezTaskAttemptID taskAttemptID,
String dagName, String vertexName,