You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [14/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,1187 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.api.records.TaskReport;
+import org.apache.tez.dag.api.records.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.TaskStateInternal;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.task.InitialTaskWithInMemSort;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementation of Task interface.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TaskImpl implements Task, EventHandler<TaskEvent> {
+
+ private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+
+ protected final DAGConfiguration conf;
+ protected final Path jobFile;
+ protected final OutputCommitter committer;
+ protected final int partition;
+ protected final TaskAttemptListener taskAttemptListener;
+ protected final TaskHeartbeatHandler taskHeartbeatHandler;
+ protected final EventHandler eventHandler;
+ private final TezTaskID taskId;
+ private Map<TezTaskAttemptID, TaskAttempt> attempts;
+ private final int maxAttempts;
+ protected final Clock clock;
+ private final Lock readLock;
+ private final Lock writeLock;
+ // TODO Metrics
+ //private final MRAppMetrics metrics;
+ protected final AppContext appContext;
+ private long scheduledTime;
+
+ protected boolean encryptedShuffle;
+ protected Credentials credentials;
+ protected Token<JobTokenIdentifier> jobToken;
+ protected String mrxModuleClassName;
+ protected TaskLocationHint locationHint;
+ private Resource taskResource;
+ private Map<String, LocalResource> localResources;
+ private Map<String, String> environment;
+
+ // counts the number of attempts that are either running or in a state where
+ // they will come to be running when they get a Container
+ private int numberUncompletedAttempts = 0;
+
+ private boolean historyTaskStartGenerated = false;
+
+ private static final SingleArcTransition<TaskImpl, TaskEvent>
+ ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+ private static final SingleArcTransition<TaskImpl, TaskEvent>
+ KILL_TRANSITION = new KillTransition();
+
+ private static final StateMachineFactory
+ <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
+ stateMachineFactory
+ = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
+ (TaskStateInternal.NEW)
+
+ // define the state machine of Task
+
+ // Transitions from NEW state
+ .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
+ TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
+ .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
+ TaskEventType.T_KILL, new KillNewTransition())
+
+ // Transitions from SCHEDULED state
+ //when the first attempt is launched, the task state is set to RUNNING
+ .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
+ .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
+ TaskEventType.T_KILL, KILL_TRANSITION)
+ .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
+ TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
+ .addTransition(TaskStateInternal.SCHEDULED,
+ EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED,
+ new AttemptFailedTransition())
+
+ // Transitions from RUNNING state
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
+ // This is an optional event.
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
+ new AttemptProcessingCompleteTransition())
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+ new AttemptCommitPendingTransition())
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ new AttemptSucceededTransition())
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_ATTEMPT_KILLED,
+ ATTEMPT_KILLED_TRANSITION)
+ .addTransition(TaskStateInternal.RUNNING,
+ EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED,
+ new AttemptFailedTransition())
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
+ TaskEventType.T_KILL, KILL_TRANSITION)
+
+ // Transitions from KILL_WAIT state
+ .addTransition(TaskStateInternal.KILL_WAIT,
+ EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+ TaskEventType.T_ATTEMPT_KILLED,
+ new KillWaitAttemptKilledTransition())
+ // Ignore-able transitions.
+ .addTransition(
+ TaskStateInternal.KILL_WAIT,
+ TaskStateInternal.KILL_WAIT,
+ EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ATTEMPT_LAUNCHED,
+ TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+ TaskEventType.T_ATTEMPT_FAILED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+ // Transitions from SUCCEEDED state
+ // TODO May required different handling if OUTPUT_CONSUMABLE is one of
+ // the stages. i.e. Task would only SUCCEED after all output consumed.
+ .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
+ EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+ .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
+ EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
+ TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
+ // Ignore-able transitions.
+ .addTransition(
+ TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+ EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
+ TaskEventType.T_ATTEMPT_LAUNCHED))
+
+ // Transitions from FAILED state
+ .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+ EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+ // Transitions from KILLED state
+ .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
+ EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+ // create the topology tables
+ .installTopology();
+
+ private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
+ stateMachine;
+
+ // TODO: Recovery
+ /*
+ // By default, the next TaskAttempt number is zero. Changes during recovery
+ protected int nextAttemptNumber = 0;
+ private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
+ new ArrayList<TaskAttemptInfo>();
+
+ private static final class RecoverdAttemptsComparator implements
+ Comparator<TaskAttemptInfo> {
+ @Override
+ public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
+ long diff = attempt1.getStartTime() - attempt2.getStartTime();
+ return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+ }
+ }
+
+ private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
+ new RecoverdAttemptsComparator();
+
+ */
+
+ private TezTaskAttemptID outputConsumableAttempt;
+ private boolean outputConsumableAttemptSuccessSent = false;
+
+ //should be set to one which comes first
+ //saying COMMIT_PENDING
+ private TezTaskAttemptID commitAttempt;
+
+ private TezTaskAttemptID successfulAttempt;
+
+ private int failedAttempts;
+ private int finishedAttempts;//finish are total of success, failed and killed
+
+ private final boolean leafVertex;
+
+ @Override
+ public TaskState getState() {
+ readLock.lock();
+ try {
+ return getExternalState(getInternalState());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public TaskImpl(TezVertexID vertexId, int partition,
+ EventHandler eventHandler, Path remoteJobConfFile, DAGConfiguration conf,
+ TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken,
+ Credentials credentials, Clock clock,
+ // TODO Recovery
+ //Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun,
+ //int startCount,
+ // TODO Metrics
+ //MRAppMetrics metrics,
+ TaskHeartbeatHandler thh, AppContext appContext,
+ String mrxModuleClassName,
+ boolean leafVertex, TaskLocationHint locationHint, Resource resource,
+ Map<String, LocalResource> localResources,
+ Map<String, String> environment) {
+ this.conf = conf;
+ this.clock = clock;
+ this.jobFile = remoteJobConfFile;
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ readLock = readWriteLock.readLock();
+ writeLock = readWriteLock.writeLock();
+ this.attempts = Collections.emptyMap();
+ // FIXME get from conf or API
+ maxAttempts = 4;
+ taskId = new TezTaskID(vertexId, partition);
+ this.partition = partition;
+ this.taskAttemptListener = taskAttemptListener;
+ this.taskHeartbeatHandler = thh;
+ this.eventHandler = eventHandler;
+ this.committer = committer;
+ this.credentials = credentials;
+ this.jobToken = jobToken;
+ // TODO Metrics
+ //this.metrics = metrics;
+ this.appContext = appContext;
+ // TODO Security
+ this.encryptedShuffle = false;
+ //conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ // MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
+ this.mrxModuleClassName = mrxModuleClassName;
+
+ this.leafVertex = leafVertex;
+ this.locationHint = locationHint;
+ this.taskResource = resource;
+ this.localResources = localResources;
+ this.environment = environment;
+
+ // TODO: Recovery
+ /*
+ // See if this is from a previous generation.
+ if (completedTasksFromPreviousRun != null
+ && completedTasksFromPreviousRun.containsKey(taskId)) {
+ // This task has TaskAttempts from previous generation. We have to replay
+ // them.
+ LOG.info("Task is from previous run " + taskId);
+ TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
+ Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
+ taskInfo.getAllTaskAttempts();
+ taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
+ taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
+ Collections.sort(taskAttemptsFromPreviousGeneration,
+ RECOVERED_ATTEMPTS_COMPARATOR);
+ }
+
+ if (taskAttemptsFromPreviousGeneration.isEmpty()) {
+ // All the previous attempts are exhausted, now start with a new
+ // generation.
+
+ // All the new TaskAttemptIDs are generated based on MR
+ // ApplicationAttemptID so that attempts from previous lives don't
+ // over-step the current one. This assumes that a task won't have more
+ // than 1000 attempts in its single generation, which is very reasonable.
+ // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
+ // and requires serious medical attention.
+ nextAttemptNumber = (startCount - 1) * 1000;
+ } else {
+ // There are still some TaskAttempts from previous generation, use them
+ nextAttemptNumber =
+ taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
+ }
+ */
+ // This "this leak" is okay because the retained pointer is in an
+ // instance variable.
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ @Override
+ public Map<TezTaskAttemptID, TaskAttempt> getAttempts() {
+ readLock.lock();
+
+ try {
+ if (attempts.size() <= 1) {
+ return attempts;
+ }
+
+ Map<TezTaskAttemptID, TaskAttempt> result
+ = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
+ result.putAll(attempts);
+
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskAttempt getAttempt(TezTaskAttemptID attemptID) {
+ readLock.lock();
+ try {
+ return attempts.get(attemptID);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Vertex getVertex() {
+ return appContext.getDAG().getVertex(taskId.getVertexID());
+ }
+
+ @Override
+ public TezTaskID getTaskId() {
+ return taskId;
+ }
+
+ @Override
+ public boolean isFinished() {
+ readLock.lock();
+ try {
+ return (getInternalState() == TaskStateInternal.SUCCEEDED ||
+ getInternalState() == TaskStateInternal.FAILED ||
+ getInternalState() == TaskStateInternal.KILLED ||
+ getInternalState() == TaskStateInternal.KILL_WAIT);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskReport getReport() {
+ // TODO TEZPB This is broken. Records will not work without the PBImpl, which
+ // is in a different package.
+ TaskReport report = Records.newRecord(TaskReport.class);
+ readLock.lock();
+ try {
+ report.setTaskId(taskId);
+ report.setStartTime(getLaunchTime());
+ report.setFinishTime(getFinishTime());
+ report.setTaskState(getState());
+ report.setProgress(getProgress());
+
+ for (TaskAttempt attempt : attempts.values()) {
+ if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
+ report.addRunningAttempt(attempt.getID());
+ }
+ }
+
+ report.setSuccessfulAttempt(successfulAttempt);
+
+ for (TaskAttempt att : attempts.values()) {
+ String prefix = "AttemptID:" + att.getID() + " Info:";
+ for (CharSequence cs : att.getDiagnostics()) {
+ report.addDiagnostics(prefix + cs);
+
+ }
+ }
+
+ // Add a copy of counters as the last step so that their lifetime on heap
+ // is as small as possible.
+ report.setCounters(getCounters());
+
+ return report;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TezCounters getCounters() {
+ TezCounters counters = null;
+ readLock.lock();
+ try {
+ TaskAttempt bestAttempt = selectBestAttempt();
+ if (bestAttempt != null) {
+ counters = bestAttempt.getCounters();
+ } else {
+ counters = TaskAttemptImpl.EMPTY_COUNTERS;
+// counters.groups = new HashMap<CharSequence, CounterGroup>();
+ }
+ return counters;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ readLock.lock();
+ try {
+ TaskAttempt bestAttempt = selectBestAttempt();
+ if (bestAttempt == null) {
+ return 0f;
+ }
+ return bestAttempt.getProgress();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public TaskStateInternal getInternalState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static TaskState getExternalState(TaskStateInternal smState) {
+ if (smState == TaskStateInternal.KILL_WAIT) {
+ return TaskState.KILLED;
+ } else {
+ return TaskState.valueOf(smState.name());
+ }
+ }
+
+ //this is always called in read/write lock
+ private long getLaunchTime() {
+ long taskLaunchTime = 0;
+ boolean launchTimeSet = false;
+ for (TaskAttempt at : attempts.values()) {
+ // select the least launch time of all attempts
+ long attemptLaunchTime = at.getLaunchTime();
+ if (attemptLaunchTime != 0 && !launchTimeSet) {
+ // For the first non-zero launch time
+ launchTimeSet = true;
+ taskLaunchTime = attemptLaunchTime;
+ } else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) {
+ taskLaunchTime = attemptLaunchTime;
+ }
+ }
+ if (!launchTimeSet) {
+ return this.scheduledTime;
+ }
+ return taskLaunchTime;
+ }
+
+ //this is always called in read/write lock
+ //TODO Verify behaviour is Task is killed (no finished attempt)
+ private long getFinishTime() {
+ if (!isFinished()) {
+ return 0;
+ }
+ long finishTime = 0;
+ for (TaskAttempt at : attempts.values()) {
+ //select the max finish time of all attempts
+ // FIXME shouldnt this not count attempts killed after an attempt succeeds
+ if (finishTime < at.getFinishTime()) {
+ finishTime = at.getFinishTime();
+ }
+ }
+ return finishTime;
+ }
+
+ private long getFinishTime(TezTaskAttemptID taId) {
+ if (taId == null) {
+ return clock.getTime();
+ }
+ long finishTime = 0;
+ for (TaskAttempt at : attempts.values()) {
+ //select the max finish time of all attempts
+ if (at.getID().equals(taId)) {
+ return at.getFinishTime();
+ }
+ }
+ return finishTime;
+ }
+
+ private TaskStateInternal finished(TaskStateInternal finalState) {
+ if (getInternalState() == TaskStateInternal.RUNNING) {
+ // TODO Metrics
+ //metrics.endRunningTask(this);
+ }
+ return finalState;
+ }
+
+ //select the nextAttemptNumber with best progress
+ // always called inside the Read Lock
+ private TaskAttempt selectBestAttempt() {
+ float progress = 0f;
+ TaskAttempt result = null;
+ for (TaskAttempt at : attempts.values()) {
+ switch (at.getState()) {
+
+ // ignore all failed task attempts
+ case FAILED:
+ case KILLED:
+ continue;
+ default:
+ }
+ if (result == null) {
+ result = at; //The first time around
+ }
+ // calculate the best progress
+ float attemptProgress = at.getProgress();
+ if (attemptProgress > progress) {
+ result = at;
+ progress = attemptProgress;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) {
+ readLock.lock();
+ boolean canCommit = false;
+ try {
+ if (commitAttempt != null) {
+ canCommit = taskAttemptID.equals(commitAttempt);
+ LOG.info("Result of canCommit for " + taskAttemptID + ":" + canCommit);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return canCommit;
+ }
+
+ @Override
+ public boolean needsWaitAfterOutputConsumable() {
+ if (mrxModuleClassName.equals(InitialTaskWithInMemSort.class.getName())) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+
+ @Override
+ public TezTaskAttemptID getOutputConsumableAttempt() {
+ readLock.lock();
+ try {
+ return this.outputConsumableAttempt;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ TaskAttemptImpl createAttempt(int attemptNumber) {
+ // FIXME TODODAGAM - implement.
+ return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
+ taskAttemptListener, null, 0, conf, committer,
+ jobToken, credentials, clock, taskHeartbeatHandler,
+ appContext, mrxModuleClassName, locationHint, taskResource,
+ localResources, environment, (failedAttempts>0));
+ }
+
+ protected TaskAttempt getSuccessfulAttempt() {
+ readLock.lock();
+ try {
+ if (null == successfulAttempt) {
+ return null;
+ }
+ return attempts.get(successfulAttempt);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ // This is always called in the Write Lock
+ private void addAndScheduleAttempt() {
+ TaskAttempt attempt = createAttempt(attempts.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created attempt " + attempt.getID());
+ }
+ switch (attempts.size()) {
+ case 0:
+ attempts = Collections.singletonMap(attempt.getID(), attempt);
+ break;
+
+ case 1:
+ Map<TezTaskAttemptID, TaskAttempt> newAttempts
+ = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(maxAttempts);
+ newAttempts.putAll(attempts);
+ attempts = newAttempts;
+ attempts.put(attempt.getID(), attempt);
+ break;
+
+ default:
+ attempts.put(attempt.getID(), attempt);
+ break;
+ }
+
+ // TODO: Recovery
+ /*
+ // Update nextATtemptNumber
+ if (taskAttemptsFromPreviousGeneration.isEmpty()) {
+ ++nextAttemptNumber;
+ } else {
+ // There are still some TaskAttempts from previous generation, use them
+ nextAttemptNumber =
+ taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
+ }
+ */
+
+ ++numberUncompletedAttempts;
+ //schedule the nextAttemptNumber
+ // send event to DAG to assign priority and schedule the attempt with global
+ // picture in mind
+ eventHandler.handle(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, attempt));
+
+ }
+
+ @Override
+ public void handle(TaskEvent event) {
+ LOG.info("DEBUG: Processing TaskEvent " + event.getTaskID() + " of type "
+ + event.getType() + " while in state " + getInternalState()
+ + ". Event: " + event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing TaskEvent " + event.getTaskID() + " of type "
+ + event.getType());
+ }
+ try {
+ writeLock.lock();
+ TaskStateInternal oldState = getInternalState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state for "
+ + this.taskId, e);
+ internalError(event.getType());
+ }
+ if (oldState != getInternalState()) {
+ LOG.info(taskId + " Task Transitioned from " + oldState + " to "
+ + getInternalState());
+ }
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected void internalError(TaskEventType type) {
+ LOG.error("Invalid event " + type + " on Task " + this.taskId);
+ eventHandler.handle(new DAGEventDiagnosticsUpdate(
+ this.taskId.getVertexID().getDAGId(), "Invalid event " + type +
+ " on Task " + this.taskId));
+ eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(),
+ DAGEventType.INTERNAL_ERROR));
+ }
+
+ private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
+ TezDependentTaskCompletionEvent.Status status) {
+ TaskAttempt attempt = attempts.get(attemptId);
+ // raise the completion event only if the container is assigned
+ // to nextAttemptNumber
+ if (needsWaitAfterOutputConsumable()) {
+ // An event may have been sent out during the OUTPUT_READY state itself.
+ // Make sure the same event is not being sent out again.
+ if (attemptId == outputConsumableAttempt
+ && status == TezDependentTaskCompletionEvent.Status.SUCCEEDED) {
+ if (outputConsumableAttemptSuccessSent) {
+ return;
+ }
+ }
+ }
+ if (attempt.getNodeHttpAddress() != null) {
+
+ String scheme = (encryptedShuffle) ? "https://" : "http://";
+ String url = scheme
+ + attempt.getNodeHttpAddress().split(":")[0] + ":"
+ + attempt.getShufflePort();
+
+
+
+ int runTime = 0;
+ if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() != 0)
+ runTime = (int) (attempt.getFinishTime() - attempt.getLaunchTime());
+
+ TezDependentTaskCompletionEvent tce = new TezDependentTaskCompletionEvent(
+ -1, attemptId, status, url, runTime);
+
+ // raise the event to job so that it adds the completion event to its
+ // data structures
+ eventHandler.handle(new VertexEventTaskAttemptCompleted(tce));
+ }
+ }
+
+ // always called inside a transition, in turn inside the Write Lock
+ private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
+ TezDependentTaskCompletionEvent.Status status) {
+ this.sendTaskAttemptCompletionEvent(attemptId, status);
+ }
+
+ // TODO: Recovery
+ /*
+ private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
+ TaskFinishedEvent tfe =
+ new TaskFinishedEvent(task.taskId,
+ task.successfulAttempt,
+ task.getFinishTime(task.successfulAttempt),
+ task.taskId.getTaskType(),
+ taskState.toString(),
+ task.getCounters());
+ return tfe;
+ }
+
+ private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TezTaskAttemptID taId) {
+ StringBuilder errorSb = new StringBuilder();
+ if (diag != null) {
+ for (String d : diag) {
+ errorSb.append(", ").append(d);
+ }
+ }
+ TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
+ TypeConverter.fromYarn(task.taskId),
+ // Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition.
+ task.getFinishTime(taId),
+ TypeConverter.fromYarn(task.getType()),
+ errorSb.toString(),
+ taskState.toString(),
+ taId == null ? null : TypeConverter.fromYarn(taId));
+ return taskFailedEvent;
+ }
+ */
+
+ private static void unSucceed(TaskImpl task) {
+ task.commitAttempt = null;
+ task.successfulAttempt = null;
+ }
+
+ /**
+ * @return a String representation of the splits.
+ *
+ * Subclasses can override this method to provide their own representations
+ * of splits (if any).
+ *
+ */
+ protected String getSplitsAsString(){
+ return "";
+ }
+
+ private void logJobHistoryTaskStartedEvent() {
+ //TODO: JobHistory
+ /*
+ TaskStartedEvent tse = new TaskStartedEvent(
+ TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
+ TypeConverter.fromYarn(task.taskId.getTaskType()),
+ task.getSplitsAsString());
+ task.eventHandler
+ .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
+ */
+ }
+
+ private void logJobHistoryTaskFinishedEvent() {
+ //TODO: JobHistory
+ /*
+ if (task.historyTaskStartGenerated) {
+ TaskFinishedEvent tfe = createTaskFinishedEvent(task,
+ TaskStateInternal.SUCCEEDED);
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
+ tfe));
+ }
+ */
+ }
+
+ private void logJobHistoryTaskFailedEvent() {
+ // TODO JobHistory
+ /*
+ TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
+ finalState, null); // TODO JH verify failedAttempt null
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
+ taskFailedEvent));
+
+ */
+ }
+
+ private static class InitialScheduleTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ task.addAndScheduleAttempt();
+ task.scheduledTime = task.clock.getTime();
+ task.logJobHistoryTaskStartedEvent();
+ task.historyTaskStartGenerated = true;
+ }
+ }
+
+ // Used when creating a new attempt while one is already running.
+ // Currently we do this for speculation. In the future we may do this
+ // for tasks that failed in a way that might indicate application code
+ // problems, so we can take later failures in parallel and flush the
+ // job quickly when this happens.
+ private static class RedundantScheduleTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ LOG.info("Scheduling a redundant attempt for task " + task.taskId);
+ task.addAndScheduleAttempt();
+ }
+ }
+
+ private static class AttemptProcessingCompleteTransition implements
+ SingleArcTransition<TaskImpl, TaskEvent> {
+
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ TaskEventTAUpdate taEvent = (TaskEventTAUpdate) event;
+ TezTaskAttemptID attemptId = taEvent.getTaskAttemptID();
+
+ if (task.outputConsumableAttempt == null) {
+ task.sendTaskAttemptCompletionEvent(attemptId,
+ TezDependentTaskCompletionEvent.Status.SUCCEEDED);
+ task.outputConsumableAttempt = attemptId;
+ task.outputConsumableAttemptSuccessSent = true;
+ LOG.info("DEBUG: TezTaskAttemptID: " + attemptId
+ + " set as the OUTPUT_READY attempt");
+ } else {
+ // Nothing to do. This task will eventually be told to die, or will be
+ // killed.
+ LOG.info("DEBUG: TezTaskAttemptID: "
+ + attemptId
+ + " reporting OUTPUT_READY. Will be asked to die since another attempt "
+ + task.outputConsumableAttempt + " already has output ready");
+ task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptId,
+ "Alternate attemptId already serving output"));
+ }
+
+ }
+ }
+
+ private static class AttemptCommitPendingTransition implements
+ SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ TaskEventTAUpdate ev = (TaskEventTAUpdate) event;
+ // The nextAttemptNumber is commit pending, decide on set the
+ // commitAttempt
+ TezTaskAttemptID attemptID = ev.getTaskAttemptID();
+ if (task.commitAttempt == null) {
+ // TODO: validate attemptID
+ task.commitAttempt = attemptID;
+ LOG.info(attemptID + " given a go for committing the task output.");
+ } else {
+ // Don't think this can be a pluggable decision, so simply raise an
+ // event for the TaskAttempt to delete its output.
+ LOG.info(task.commitAttempt
+ + " already given a go for committing the task output, so killing "
+ + attemptID);
+ task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptID,
+ "Output being committed by alternate attemptId."));
+ }
+ }
+ }
+
+ private static class AttemptSucceededTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ task.handleTaskAttemptCompletion(
+ ((TaskEventTAUpdate) event).getTaskAttemptID(),
+ TezDependentTaskCompletionEvent.Status.SUCCEEDED);
+ task.finishedAttempts++;
+ --task.numberUncompletedAttempts;
+ task.successfulAttempt = ((TaskEventTAUpdate) event).getTaskAttemptID();
+ task.eventHandler.handle(new VertexEventTaskCompleted(
+ task.taskId, TaskState.SUCCEEDED));
+ LOG.info("Task succeeded with attempt " + task.successfulAttempt);
+ // issue kill to all other attempts
+ if (task.historyTaskStartGenerated) {
+ task.logJobHistoryTaskFinishedEvent();
+ }
+
+ for (TaskAttempt attempt : task.attempts.values()) {
+ if (attempt.getID() != task.successfulAttempt &&
+ // This is okay because it can only talk us out of sending a
+ // TA_KILL message to an attempt that doesn't need one for
+ // other reasons.
+ !attempt.isFinished()) {
+ LOG.info("Issuing kill to other attempt " + attempt.getID());
+ task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
+ .getID(), "Alternate attempt succeeded"));
+ }
+ }
+ task.finished(TaskStateInternal.SUCCEEDED);
+ }
+ }
+
+ private static class AttemptKilledTransition implements
+ SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ task.handleTaskAttemptCompletion(
+ ((TaskEventTAUpdate) event).getTaskAttemptID(),
+ TezDependentTaskCompletionEvent.Status.KILLED);
+ task.finishedAttempts++;
+ --task.numberUncompletedAttempts;
+ if (task.successfulAttempt == null) {
+ task.addAndScheduleAttempt();
+ }
+ }
+ }
+
+
+ private static class KillWaitAttemptKilledTransition implements
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ protected TaskStateInternal finalState = TaskStateInternal.KILLED;
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ task.handleTaskAttemptCompletion(
+ ((TaskEventTAUpdate) event).getTaskAttemptID(),
+ TezDependentTaskCompletionEvent.Status.KILLED);
+ task.finishedAttempts++;
+ // check whether all attempts are finished
+ if (task.finishedAttempts == task.attempts.size()) {
+ if (task.historyTaskStartGenerated) {
+ task.logJobHistoryTaskFailedEvent();
+ } else {
+ LOG.debug("Not generating HistoryFinish event since start event not" +
+ " generated for task: " + task.getTaskId());
+ }
+
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId, getExternalState(finalState)));
+ return finalState;
+ }
+ return task.getInternalState();
+ }
+ }
+
+ private static class AttemptFailedTransition implements
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ task.failedAttempts++;
+ TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+ if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+ task.commitAttempt = null;
+ }
+ if (castEvent.getTaskAttemptID().equals(task.outputConsumableAttempt)) {
+ task.outputConsumableAttempt = null;
+ task.handleTaskAttemptCompletion(castEvent.getTaskAttemptID(),
+ TezDependentTaskCompletionEvent.Status.FAILED);
+ }
+
+ // The attempt would have informed the scheduler about it's failure
+
+ task.finishedAttempts++;
+ if (task.failedAttempts < task.maxAttempts) {
+ task.handleTaskAttemptCompletion(
+ ((TaskEventTAUpdate) event).getTaskAttemptID(),
+ TezDependentTaskCompletionEvent.Status.FAILED);
+ // we don't need a new event if we already have a spare
+ if (--task.numberUncompletedAttempts == 0
+ && task.successfulAttempt == null) {
+ task.addAndScheduleAttempt();
+ }
+ } else {
+ task.handleTaskAttemptCompletion(
+ ((TaskEventTAUpdate) event).getTaskAttemptID(),
+ TezDependentTaskCompletionEvent.Status.TIPFAILED);
+ TaskEventTAUpdate ev = (TaskEventTAUpdate) event;
+ TezTaskAttemptID taId = ev.getTaskAttemptID();
+
+ if (task.historyTaskStartGenerated) {
+ task.logJobHistoryTaskFailedEvent();
+ } else {
+ LOG.debug("Not generating HistoryFinish event since start event not" +
+ " generated for task: " + task.getTaskId());
+ }
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId, TaskState.FAILED));
+ return task.finished(TaskStateInternal.FAILED);
+ }
+ return getDefaultState(task);
+ }
+
+ protected TaskStateInternal getDefaultState(TaskImpl task) {
+ return task.getInternalState();
+ }
+ }
+
+ private static class MapRetroactiveFailureTransition
+ extends AttemptFailedTransition {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ if (event instanceof TaskEventTAUpdate) {
+ TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+ if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
+ !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+ // don't allow a different task attempt to override a previous
+ // succeeded state
+ return TaskStateInternal.SUCCEEDED;
+ }
+ }
+
+ if (task.leafVertex) {
+ LOG.error("Unexpected event for task of leaf vertex " + event.getType());
+ task.internalError(event.getType());
+ }
+
+ // tell the job about the rescheduling
+ task.eventHandler.handle(
+ new VertexEventTaskReschedule(task.taskId));
+ // super.transition is mostly coded for the case where an
+ // UNcompleted task failed. When a COMPLETED task retroactively
+ // fails, we have to let AttemptFailedTransition.transition
+ // believe that there's no redundancy.
+ unSucceed(task);
+ // fake increase in Uncomplete attempts for super.transition
+ ++task.numberUncompletedAttempts;
+ return super.transition(task, event);
+ }
+
+ @Override
+ protected TaskStateInternal getDefaultState(TaskImpl task) {
+ return TaskStateInternal.SCHEDULED;
+ }
+ }
+
+ private static class MapRetroactiveKilledTransition implements
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ // verify that this occurs only for map task
+ // TODO: consider moving it to MapTaskImpl
+ if (task.leafVertex) {
+ LOG.error("Unexpected event for task of leaf vertex " + event.getType());
+ task.internalError(event.getType());
+ }
+
+ TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event;
+ TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID();
+ if(task.successfulAttempt == attemptId) {
+ // successful attempt is now killed. reschedule
+ // tell the job about the rescheduling
+ unSucceed(task);
+ task.handleTaskAttemptCompletion(
+ attemptId,
+ TezDependentTaskCompletionEvent.Status.KILLED);
+ task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+ // typically we are here because this map task was run on a bad node and
+ // we want to reschedule it on a different node.
+ // Depending on whether there are previous failed attempts or not this
+ // can SCHEDULE or RESCHEDULE the container allocate request. If this
+ // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
+ // from the map splitInfo. So the bad node might be sent as a location
+ // to the RM. But the RM would ignore that just like it would ignore
+ // currently pending container requests affinitized to bad nodes.
+ task.addAndScheduleAttempt();
+ return TaskStateInternal.SCHEDULED;
+ } else {
+ // nothing to do
+ return TaskStateInternal.SUCCEEDED;
+ }
+ }
+ }
+
+ private static class KillNewTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+
+ if (task.historyTaskStartGenerated) {
+ task.logJobHistoryTaskFailedEvent();
+ } else {
+ LOG.debug("Not generating HistoryFinish event since start event not" +
+ " generated for task: " + task.getTaskId());
+ }
+
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId, TaskState.KILLED));
+ // TODO Metrics
+ //task.metrics.endWaitingTask(task);
+ }
+ }
+
+ private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+ if (attempt != null && !attempt.isFinished()) {
+ eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
+ logMsg));
+ }
+ }
+
+ private static class KillTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ // issue kill to all non finished attempts
+ for (TaskAttempt attempt : task.attempts.values()) {
+ task.killUnfinishedAttempt
+ (attempt, "Task KILL is received. Killing attempt!");
+ }
+
+ task.numberUncompletedAttempts = 0;
+ }
+ }
+
+ static class LaunchTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ // TODO Metrics
+ /*
+ task.metrics.launchedTask(task);
+ task.metrics.runningTask(task);
+ */
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
------------------------------------------------------------------------------
svn:eol-style = native