You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [22/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,1276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventCounterUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventDiagnosticsUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventTaskAttemptFetchFailure;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventOutputConsumable;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventSchedule;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventTAUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.common.DiagnosableEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class TaskAttemptImpl implements TaskAttempt,
+ EventHandler<TaskAttemptEvent> {
+
+ // TODO Ensure MAPREDUCE-4457 is factored in. Also MAPREDUCE-4068.
+
+ // TODO Consider TAL registartion in the TaskAttempt instead of the container.
+
+ private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
+ private static final String LINE_SEPARATOR = System
+ .getProperty("line.separator");
+
+ static final Counters EMPTY_COUNTERS = new Counters();
+ private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
+
+ protected final JobConf conf;
+ protected final Path jobFile;
+ protected final int partition;
+ @SuppressWarnings("rawtypes")
+ protected EventHandler eventHandler;
+ private final TaskAttemptId attemptId;
+ private final TaskId taskId;
+ private final JobId jobId;
+ private final Clock clock;
+// private final TaskAttemptListener taskAttemptListener;
+ private final OutputCommitter committer;
+ private final Resource resourceCapability;
+ private final String[] dataLocalHosts;
+ private final List<String> diagnostics = new ArrayList<String>();
+ private final Lock readLock;
+ private final Lock writeLock;
+ protected final AppContext appContext;
+ private final TaskHeartbeatHandler taskHeartbeatHandler;
+ private Credentials credentials;
+ protected Token<JobTokenIdentifier> jobToken;
+ private long launchTime = 0;
+ private long finishTime = 0;
+ private WrappedProgressSplitsBlock progressSplitBlock;
+ private int shufflePort = -1;
+ private String trackerName;
+ private int httpPort;
+
+
+ // TODO Can these be replaced by the container object ?
+ private ContainerId containerId;
+ private NodeId containerNodeId;
+ private String nodeHttpAddress;
+ private String nodeRackName;
+
+ private TaskAttemptStatus reportedStatus;
+
+ private boolean speculatorContainerRequestSent = false;
+ protected String tezModuleClassName;
+
+
+
+
+ private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION =
+ new DiagnosticInformationUpdater();
+
+ private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+ STATUS_UPDATER = new StatusUpdaterTransition();
+
+ private final StateMachine
+ <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
+
+ protected static final FailedTransitionHelper FAILED_HELPER =
+ new FailedTransitionHelper();
+
+ protected static final KilledTransitionHelper KILLED_HELPER =
+ new KilledTransitionHelper();
+
+ private static StateMachineFactory
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachineFactory
+ = new StateMachineFactory
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+ (TaskAttemptStateInternal.NEW)
+
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminateTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminateTransition(KILLED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedBeforeRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedBeforeRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, new OutputConsumableTransition()) //Optional, may not come in for all tasks.
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContaienrCompletedWhileRunningTransition())
+
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE) // Stuck RPC. The client retries in a loop.
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingAtOutputConsumableTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this. Something needs to go out to the job.
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING)
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ // How will duplicate history events be handled ?
+ // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedAfterSuccessTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
+
+
+ .installTopology();
+
+ // TODO Remove TaskAttemptListener from the constructor.
+ @SuppressWarnings("rawtypes")
+ public TaskAttemptImpl(TaskId taskId, int i, EventHandler eventHandler,
+ TaskAttemptListener tal, Path jobFile, int partition, JobConf conf,
+ String[] dataLocalHosts, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
+ TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
+ String tezModuleClassName) {
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ this.taskId = taskId;
+ this.jobId = taskId.getJobId();
+ this.attemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
+ this.eventHandler = eventHandler;
+ //Reported status
+ this.jobFile = jobFile;
+ this.partition = partition;
+ this.conf = conf;
+ this.dataLocalHosts = dataLocalHosts;
+ this.committer = committer;
+ this.jobToken = jobToken;
+ this.credentials = credentials;
+ this.clock = clock;
+ this.taskHeartbeatHandler = taskHeartbeatHandler;
+ this.appContext = appContext;
+ this.resourceCapability = BuilderUtils.newResource(getMemoryRequired(conf,
+ taskId.getTaskType()), getCpuRequired(conf, taskId.getTaskType()));
+ this.reportedStatus = new TaskAttemptStatus();
+ this.tezModuleClassName = tezModuleClassName;
+ initTaskAttemptStatus(reportedStatus);
+ RackResolver.init(conf);
+ this.stateMachine = stateMachineFactory.make(this);
+ }
+
+
+
+
+ @Override
+ public TaskAttemptId getID() {
+ return attemptId;
+ }
+
+ protected abstract MRTaskContext createRemoteMRTaskContext();
+
+ @Override
+ public TaskAttemptReport getReport() {
+ TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
+ readLock.lock();
+ try {
+ result.setTaskAttemptId(attemptId);
+ //take the LOCAL state of attempt
+ //DO NOT take from reportedStatus
+
+ result.setTaskAttemptState(getState());
+ result.setProgress(reportedStatus.progress);
+ result.setStartTime(launchTime);
+ result.setFinishTime(finishTime);
+ result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
+ result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
+ result.setPhase(reportedStatus.phase);
+ result.setStateString(reportedStatus.stateString);
+ result.setCounters(TypeConverter.toYarn(getCounters()));
+ result.setContainerId(this.getAssignedContainerID());
+ result.setNodeManagerHost(trackerName);
+ result.setNodeManagerHttpPort(httpPort);
+ if (this.containerNodeId != null) {
+ result.setNodeManagerPort(this.containerNodeId.getPort());
+ }
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<String> getDiagnostics() {
+ List<String> result = new ArrayList<String>();
+ readLock.lock();
+ try {
+ result.addAll(diagnostics);
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Counters getCounters() {
+ readLock.lock();
+ try {
+ Counters counters = reportedStatus.counters;
+ if (counters == null) {
+ counters = EMPTY_COUNTERS;
+ }
+ return counters;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ readLock.lock();
+ try {
+ return reportedStatus.progress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskAttemptState getState() {
+ readLock.lock();
+ try {
+ return getExternalState(stateMachine.getCurrentState());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ readLock.lock();
+ try {
+ return (EnumSet.of(TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS)
+ .contains(getInternalState()));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ContainerId getAssignedContainerID() {
+ readLock.lock();
+ try {
+ return containerId;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ readLock.lock();
+ try {
+ return containerNodeId.toString();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ readLock.lock();
+ try {
+ return containerNodeId;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**If container Assigned then return the node's address, otherwise null.
+ */
+ @Override
+ public String getNodeHttpAddress() {
+ readLock.lock();
+ try {
+ return nodeHttpAddress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * If container Assigned then return the node's rackname, otherwise null.
+ */
+ @Override
+ public String getNodeRackName() {
+ this.readLock.lock();
+ try {
+ return this.nodeRackName;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getLaunchTime() {
+ readLock.lock();
+ try {
+ return launchTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getFinishTime() {
+ readLock.lock();
+ try {
+ return finishTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getShuffleFinishTime() {
+ readLock.lock();
+ try {
+ return this.reportedStatus.shuffleFinishTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getSortFinishTime() {
+ readLock.lock();
+ try {
+ return this.reportedStatus.sortFinishTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public int getShufflePort() {
+ readLock.lock();
+ try {
+ return shufflePort;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing TaskAttemptEvent " + event.getTaskAttemptID() + " of type "
+ + event.getType());
+ }
+ LOG.info("DEBUG: Processing " + event.getTaskAttemptID() + " of type "
+ + event.getType() + " while in state: " + getInternalState());
+ writeLock.lock();
+ try {
+ final TaskAttemptStateInternal oldState = getInternalState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state for "
+ + this.attemptId, e);
+ eventHandler.handle(new JobEventDiagnosticsUpdate(
+ this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() +
+ " on TaskAttempt " + this.attemptId));
+ eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
+ JobEventType.INTERNAL_ERROR));
+ }
+ if (oldState != getInternalState()) {
+ LOG.info(attemptId + " TaskAttempt Transitioned from "
+ + oldState + " to "
+ + getInternalState());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public TaskAttemptStateInternal getInternalState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static TaskAttemptState getExternalState(
+ TaskAttemptStateInternal smState) {
+ switch (smState) {
+ case NEW:
+ case START_WAIT:
+ return TaskAttemptState.STARTING;
+ case RUNNING:
+ return TaskAttemptState.RUNNING;
+ case COMMIT_PENDING:
+ case OUTPUT_CONSUMABLE:
+ return TaskAttemptState.COMMIT_PENDING;
+ case FAILED:
+ case FAIL_IN_PROGRESS:
+ return TaskAttemptState.FAILED;
+ case KILLED:
+ case KILL_IN_PROGRESS:
+ return TaskAttemptState.KILLED;
+ case SUCCEEDED:
+ return TaskAttemptState.SUCCEEDED;
+ default:
+ throw new YarnException("Attempt to convert invalid "
+ + "stateMachineTaskAttemptState to externalTaskAttemptState: "
+ + smState);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ this.eventHandler.handle(event);
+ }
+
+ private int getMemoryRequired(Configuration conf, TaskType taskType) {
+ int memory = 1024;
+ if (taskType == TaskType.MAP) {
+ memory =
+ conf.getInt(MRJobConfig.MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+ } else if (taskType == TaskType.REDUCE) {
+ memory =
+ conf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
+ MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
+ }
+
+ return memory;
+ }
+
+ private int getCpuRequired(Configuration conf, TaskType taskType) {
+ int vcores = 1;
+ if (taskType == TaskType.MAP) {
+ vcores =
+ conf.getInt(MRJobConfig.MAP_CPU_VCORES,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ } else if (taskType == TaskType.REDUCE) {
+ vcores =
+ conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ }
+
+ return vcores;
+ }
+
+ // always called in write lock
+ private void setFinishTime() {
+ // set the finish time only if launch time is set
+ if (launchTime != 0) {
+ finishTime = clock.getTime();
+ }
+ }
+
+ // TOOD Merge some of these JobCounter events.
+ private static JobEventCounterUpdate createJobCounterUpdateEventTALaunched(
+ TaskAttemptImpl ta) {
+ JobEventCounterUpdate jce = new JobEventCounterUpdate(ta.jobId);
+ jce.addCounterUpdate(
+ ta.taskId.getTaskType() == TaskType.MAP ? JobCounter.TOTAL_LAUNCHED_MAPS
+ : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
+ return jce;
+ }
+
+ private static JobEventCounterUpdate createJobCounterUpdateEventSlotMillis(
+ TaskAttemptImpl ta) {
+ JobEventCounterUpdate jce = new JobEventCounterUpdate(ta.jobId);
+ long slotMillis = computeSlotMillis(ta);
+ jce.addCounterUpdate(
+ ta.taskId.getTaskType() == TaskType.MAP ? JobCounter.SLOTS_MILLIS_MAPS
+ : JobCounter.SLOTS_MILLIS_REDUCES, slotMillis);
+ return jce;
+ }
+
+ private static JobEventCounterUpdate createJobCounterUpdateEventTATerminated(
+ TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted,
+ TaskAttemptStateInternal taState) {
+ TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+ JobEventCounterUpdate jce = new JobEventCounterUpdate(taskAttempt.getID()
+ .getTaskId().getJobId());
+
+ long slotMillisIncrement = computeSlotMillis(taskAttempt);
+
+ if (taskType == TaskType.MAP) {
+ if (taState == TaskAttemptStateInternal.FAILED) {
+ jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
+ } else if (taState == TaskAttemptStateInternal.KILLED) {
+ jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
+ }
+ if (!taskAlreadyCompleted) {
+ // dont double count the elapsed time
+ jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+ }
+ } else {
+ if (taState == TaskAttemptStateInternal.FAILED) {
+ jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
+ } else if (taState == TaskAttemptStateInternal.KILLED) {
+ jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
+ }
+ if (!taskAlreadyCompleted) {
+ // dont double count the elapsed time
+ jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES,
+ slotMillisIncrement);
+ }
+ }
+ return jce;
+ }
+
+ private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
+ TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+ int slotMemoryReq =
+ taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+
+ int minSlotMemSize =
+ taskAttempt.appContext.getClusterInfo().getMinContainerCapability()
+ .getMemory();
+
+ int simSlotsRequired =
+ minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
+ / minSlotMemSize);
+
+ long slotMillisIncrement =
+ simSlotsRequired
+ * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
+ return slotMillisIncrement;
+ }
+
+ // TODO Change to return a JobHistoryEvent.
+ private static
+ TaskAttemptUnsuccessfulCompletionEvent
+ createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
+ TaskAttemptStateInternal attemptState) {
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ new TaskAttemptUnsuccessfulCompletionEvent(
+ TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
+ .getTaskType()), attemptState.toString(),
+ taskAttempt.finishTime,
+ taskAttempt.containerNodeId == null ? "UNKNOWN"
+ : taskAttempt.containerNodeId.getHost(),
+ taskAttempt.containerNodeId == null ? -1
+ : taskAttempt.containerNodeId.getPort(),
+ taskAttempt.nodeRackName == null ? "UNKNOWN"
+ : taskAttempt.nodeRackName,
+ StringUtils.join(
+ LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
+ .getProgressSplitBlock().burst());
+ return tauce;
+ }
+
+ // TODO Incorporate MAPREDUCE-4838
+ private JobHistoryEvent createTaskAttemptStartedEvent() {
+ TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(
+ TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(taskId
+ .getTaskType()), launchTime, trackerName, httpPort, shufflePort,
+ containerId, "", "");
+ return new JobHistoryEvent(jobId, tase);
+
+ }
+
+ private WrappedProgressSplitsBlock getProgressSplitBlock() {
+ readLock.lock();
+ try {
+ if (progressSplitBlock == null) {
+ progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
+ MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
+ MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
+ }
+ return progressSplitBlock;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void updateProgressSplits() {
+ double newProgress = reportedStatus.progress;
+ newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
+ Counters counters = reportedStatus.counters;
+ if (counters == null)
+ return;
+
+ WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
+ if (splitsBlock != null) {
+ long now = clock.getTime();
+ long start = getLaunchTime();
+
+ if (start == 0)
+ return;
+
+ if (start != 0 && now - start <= Integer.MAX_VALUE) {
+ splitsBlock.getProgressWallclockTime().extend(newProgress,
+ (int) (now - start));
+ }
+
+ Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
+ if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
+ splitsBlock.getProgressCPUTime().extend(newProgress,
+ (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
+ }
+
+ Counter virtualBytes = counters
+ .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
+ if (virtualBytes != null) {
+ splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
+ (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+ }
+
+ Counter physicalBytes = counters
+ .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
+ if (physicalBytes != null) {
+ splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
+ (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+ }
+ }
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
+ //Log finished events only if an attempt started.
+ if (getLaunchTime() == 0) return;
+ if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
+ MapAttemptFinishedEvent mfe =
+ new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.mapFinishTime,
+ finishTime,
+ this.containerNodeId == null ? "UNKNOWN"
+ : this.containerNodeId.getHost(),
+ this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst());
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
+ } else {
+ ReduceAttemptFinishedEvent rfe =
+ new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.shuffleFinishTime,
+ this.reportedStatus.sortFinishTime,
+ finishTime,
+ this.containerNodeId == null ? "UNKNOWN"
+ : this.containerNodeId.getHost(),
+ this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst());
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
+ }
+ }
+
+ private void maybeSendSpeculatorContainerRequired() {
+ if (!speculatorContainerRequestSent) {
+ sendEvent(new SpeculatorEvent(taskId, +1));
+ speculatorContainerRequestSent = true;
+ }
+ }
+
+ private void maybeSendSpeculatorContainerNoLongerRequired() {
+ if (speculatorContainerRequestSent) {
+ sendEvent(new SpeculatorEvent(taskId, -1));
+ speculatorContainerRequestSent = false;
+ }
+ }
+
+ private void sendTaskAttemptCleanupEvent() {
+ TaskAttemptContext taContext = new TaskAttemptContextImpl(this.conf,
+ TypeConverter.fromYarn(this.attemptId));
+ sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
+ }
+
+ protected String[] resolveHosts(String[] src) {
+ return TaskAttemptImplHelpers.resolveHosts(src);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Start of Transition Classes //
+ //////////////////////////////////////////////////////////////////////////////
+
+ protected static class ScheduleTaskattemptTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
+ // Event to speculator - containerNeeded++
+ ta.maybeSendSpeculatorContainerRequired();
+
+ // TODO Creating the remote task here may not be required in case of
+ // recovery.
+
+ // Create the remote task.
+ MRTaskContext remoteTaskContext = ta.createRemoteMRTaskContext();
+ // Create startTaskRequest
+
+ String[] hostArray;
+ String[] rackArray;
+ if (scheduleEvent.isRescheduled()) {
+ // No node/rack locality.
+ hostArray = new String[0];
+ rackArray = new String[0];
+ } else {
+ // Ask for node / rack locality.
+ Set<String> racks = new HashSet<String>();
+ for (String host : ta.dataLocalHosts) {
+ racks.add(RackResolver.resolve(host).getNetworkLocation());
+ }
+ hostArray = ta.resolveHosts(ta.dataLocalHosts);
+ rackArray = racks.toArray(new String[racks.size()]);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Asking for container launch with taskAttemptContext: "
+ + remoteTaskContext);
+ }
+ // Send out a launch request to the scheduler.
+ AMSchedulerTALaunchRequestEvent launchRequestEvent =
+ new AMSchedulerTALaunchRequestEvent(
+ ta.attemptId, scheduleEvent.isRescheduled(),
+ ta.resourceCapability,
+ remoteTaskContext, ta, ta.credentials, ta.jobToken, hostArray,
+ rackArray);
+ ta.sendEvent(launchRequestEvent);
+ }
+ }
+
+ protected static class DiagnosticInformationUpdater implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptEventDiagnosticsUpdate diagEvent =
+ (TaskAttemptEventDiagnosticsUpdate) event;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Diagnostics update for " + ta.attemptId + ": "
+ + diagEvent.getDiagnosticInfo());
+ }
+ ta.addDiagnosticInfo(diagEvent.getDiagnosticInfo());
+ }
+ }
+
+ protected static class TerminateTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ TerminatedTransitionHelper helper;
+
+ public TerminateTransition(TerminatedTransitionHelper helper) {
+ this.helper = helper;
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ ta.setFinishTime();
+ if (event instanceof DiagnosableEvent) {
+ ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
+ }
+
+ ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
+ helper.getTaskAttemptStateInternal()));
+ if (ta.getLaunchTime() != 0) {
+ // TODO For cases like this, recovery goes for a toss, since the the
+ // attempt will not exist in the history file.
+ ta.sendEvent(new JobHistoryEvent(ta.jobId,
+ createTaskAttemptUnsuccessfulCompletionEvent(ta,
+ helper.getTaskAttemptStateInternal())));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not generating HistoryFinish event since start event not "
+ +
+ "generated for taskAttempt: " + ta.getID());
+ }
+ }
+ // Send out events to the Task - indicating TaskAttemptTermination(F/K)
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ helper.getTaskEventType()));
+ }
+ }
+
+ protected static class StartedTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
+ TaskAttemptEventStartedRemotely event =
+ (TaskAttemptEventStartedRemotely) origEvent;
+
+ Container container = ta.appContext.getAllContainers()
+ .get(event.getContainerId()).getContainer();
+
+ ta.containerId = event.getContainerId();
+ ta.containerNodeId = container.getNodeId();
+ ta.nodeHttpAddress = container.getNodeHttpAddress();
+ ta.nodeRackName = RackResolver.resolve(ta.containerNodeId.getHost())
+ .getNetworkLocation();
+ ta.launchTime = ta.clock.getTime();
+ ta.shufflePort = event.getShufflePort();
+
+ // TODO Resolve to host / IP in case of a local address.
+ InetSocketAddress nodeHttpInetAddr = NetUtils
+ .createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
+ ta.trackerName = nodeHttpInetAddr.getHostName();
+ ta.httpPort = nodeHttpInetAddr.getPort();
+ ta.sendEvent(createJobCounterUpdateEventTALaunched(ta));
+
+ LOG.info("TaskAttempt: [" + ta.attemptId + "] started."
+ + " Is using containerId: [" + ta.containerId + "]"
+ + " on NM: [" + ta.containerNodeId + "]");
+
+ // JobHistoryEvent
+ ta.sendEvent(ta.createTaskAttemptStartedEvent());
+
+ // Inform the speculator about the container assignment.
+ ta.maybeSendSpeculatorContainerNoLongerRequired();
+ // Inform speculator about startTime
+ ta.sendEvent(new SpeculatorEvent(ta.attemptId, true, ta.launchTime));
+
+ // Inform the Task
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+
+ ta.taskHeartbeatHandler.register(ta.attemptId);
+ }
+ }
+
+ protected static class TerminatedBeforeRunningTransition extends
+ TerminateTransition {
+
+ public TerminatedBeforeRunningTransition(
+ TerminatedTransitionHelper helper) {
+ super(helper);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ // Inform the scheduler
+ ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
+ ta.containerId, helper.getTaskAttemptState()));
+ // Decrement speculator container request.
+ ta.maybeSendSpeculatorContainerNoLongerRequired();
+ }
+ }
+
+ protected static class NodeFailedBeforeRunningTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public NodeFailedBeforeRunningTransition() {
+ super(KILLED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+ ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
+ }
+ }
+
+ protected static class ContainerTerminatingBeforeRunningTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public ContainerTerminatingBeforeRunningTransition() {
+ super(FAILED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ TaskAttemptEventContainerTerminating tEvent =
+ (TaskAttemptEventContainerTerminating) event;
+ ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+ }
+ }
+
+ protected static class ContainerCompletedBeforeRunningTransition extends
+ TerminatedBeforeRunningTransition {
+ public ContainerCompletedBeforeRunningTransition() {
+ super(FAILED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.sendTaskAttemptCleanupEvent();
+
+ TaskAttemptEventContainerTerminated tEvent =
+ (TaskAttemptEventContainerTerminated) event;
+ ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+ }
+ }
+
+ protected static class StatusUpdaterTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptStatus newReportedStatus =
+ ((TaskAttemptEventStatusUpdate) event)
+ .getReportedTaskAttemptStatus();
+ ta.reportedStatus = newReportedStatus;
+ ta.reportedStatus.taskState = ta.getState();
+
+ // Inform speculator of status.
+ ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
+
+ ta.updateProgressSplits();
+
+ // Inform the job about fetch failures if they exist.
+ if (ta.reportedStatus.fetchFailedMaps != null
+ && ta.reportedStatus.fetchFailedMaps.size() > 0) {
+ ta.sendEvent(new JobEventTaskAttemptFetchFailure(ta.attemptId,
+ ta.reportedStatus.fetchFailedMaps));
+ }
+ // TODO at some point. Nodes may be interested in FetchFailure info.
+ // Can be used to blacklist nodes.
+ }
+ }
+
+ protected static class OutputConsumableTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptEventOutputConsumable orEvent =
+ (TaskAttemptEventOutputConsumable) event;
+ ta.shufflePort = orEvent.getOutputContext().getShufflePort();
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE));
+ }
+ }
+
+ protected static class CommitPendingTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+ }
+ }
+
+ protected static class SucceededTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ ta.setFinishTime();
+ //Inform the speculator.
+ ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
+
+ // Send out history event.
+ ta.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
+ ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
+
+ // Inform the Scheduler.
+ ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
+ ta.containerId, TaskAttemptState.SUCCEEDED));
+
+ // Inform the task.
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ //Unregister from the TaskHeartbeatHandler.
+ ta.taskHeartbeatHandler.unregister(ta.attemptId);
+
+ // TODO maybe. For reuse ... Stacking pulls for a reduce task, even if the
+ // TA finishes independently. // Will likely be the Job's responsibility.
+ }
+ }
+
+ protected static class TerminatedWhileRunningTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public TerminatedWhileRunningTransition(TerminatedTransitionHelper helper) {
+ super(helper);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.taskHeartbeatHandler.unregister(ta.attemptId);
+ }
+ }
+
+ protected static class ContaienrCompletedWhileRunningTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public ContaienrCompletedWhileRunningTransition() {
+ super(FAILED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.sendTaskAttemptCleanupEvent();
+ }
+ }
+
+ protected static class CommitPendingAtOutputConsumableTransition extends
+ CommitPendingTransition {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ // TODO Figure out the interaction between OUTPUT_CONSUMABLE AND
+ // COMMIT_PENDING, Ideally both should not exist for the same task.
+ super.transition(ta, event);
+ LOG.info("Received a commit pending while in the OutputConsumable state");
+ }
+ }
+
+ protected static class ContainerCompletedWhileTerminating implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ ta.sendTaskAttemptCleanupEvent();
+ TaskAttemptEventContainerTerminated tEvent =
+ (TaskAttemptEventContainerTerminated) event;
+ ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+ }
+
+ }
+
+ protected static class TerminatedAfterSuccessTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public TerminatedAfterSuccessTransition(TerminatedTransitionHelper helper) {
+ super(helper);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.sendTaskAttemptCleanupEvent();
+ }
+ }
+
+
+ private void initTaskAttemptStatus(TaskAttemptStatus result) {
+ result.progress = 0.0f;
+ result.phase = Phase.STARTING;
+ result.stateString = "NEW";
+ result.taskState = TaskAttemptState.NEW;
+ Counters counters = EMPTY_COUNTERS;
+ // counters.groups = new HashMap<String, CounterGroup>();
+ result.counters = counters;
+ }
+
+ private void addDiagnosticInfo(String diag) {
+ if (diag != null && !diag.equals("")) {
+ diagnostics.add(diag);
+ }
+ }
+
+ protected interface TerminatedTransitionHelper {
+
+ public TaskAttemptStateInternal getTaskAttemptStateInternal();
+
+ public TaskAttemptState getTaskAttemptState();
+
+ public TaskEventType getTaskEventType();
+ }
+
+ protected static class FailedTransitionHelper implements
+ TerminatedTransitionHelper {
+
+ @Override
+ public TaskAttemptStateInternal getTaskAttemptStateInternal() {
+ return TaskAttemptStateInternal.FAILED;
+ }
+
+ @Override
+ public TaskAttemptState getTaskAttemptState() {
+ return TaskAttemptState.FAILED;
+ }
+
+ @Override
+ public TaskEventType getTaskEventType() {
+ return TaskEventType.T_ATTEMPT_FAILED;
+ }
+ }
+
+
+
+ protected static class KilledTransitionHelper implements
+ TerminatedTransitionHelper {
+
+ @Override
+ public TaskAttemptStateInternal getTaskAttemptStateInternal() {
+ return TaskAttemptStateInternal.KILLED;
+ }
+
+ @Override
+ public TaskAttemptState getTaskAttemptState() {
+ return TaskAttemptState.KILLED;
+ }
+
+ @Override
+ public TaskEventType getTaskEventType() {
+ return TaskEventType.T_ATTEMPT_KILLED;
+ }
+ }
+
+
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TaskAttemptImplHelpers {
+
+ private static final Log LOG = LogFactory.getLog(TaskAttemptImplHelpers.class);
+
+ static String[] resolveHosts(String[] src) {
+ String[] result = new String[src.length];
+ for (int i = 0; i < src.length; i++) {
+ if (isIP(src[i])) {
+ result[i] = resolveHost(src[i]);
+ } else {
+ result[i] = src[i];
+ }
+ }
+ return result;
+ }
+
+ static String resolveHost(String src) {
+ String result = src; // Fallback in case of failure.
+ try {
+ InetAddress addr = InetAddress.getByName(src);
+ result = addr.getHostName();
+ } catch (UnknownHostException e) {
+ LOG.warn("Failed to resolve address: " + src
+ + ". Continuing to use the same.");
+ }
+ return result;
+ }
+
+ private static final Pattern ipPattern = // Pattern for matching ip
+ Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}");
+
+ static boolean isIP(String src) {
+ return ipPattern.matcher(src).matches();
+ }
+}