You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [13/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,1288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TezMRTypeConverter;
+import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.api.records.TaskAttemptReport;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
+import org.apache.tez.dag.app.speculate.SpeculatorEvent;
+import org.apache.tez.dag.app.taskclean.TaskCleanupEvent;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class TaskAttemptImpl implements TaskAttempt,
+ EventHandler<TaskAttemptEvent> {
+
+ // TODO Ensure MAPREDUCE-4457 is factored in. Also MAPREDUCE-4068.
+ // TODO Consider TAL registartion in the TaskAttempt instead of the container.
+
+ private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
+ private static final String LINE_SEPARATOR = System
+ .getProperty("line.separator");
+
+ static final TezCounters EMPTY_COUNTERS = new TezCounters();
+ private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
+
+ protected final DAGConfiguration conf;
+ protected final Path jobFile;
+ protected final int partition;
+ @SuppressWarnings("rawtypes")
+ protected EventHandler eventHandler;
+ private final TezTaskAttemptID attemptId;
+ private final Clock clock;
+// private final TaskAttemptListener taskAttemptListener;
+ private final OutputCommitter committer;
+ private final List<String> diagnostics = new ArrayList<String>();
+ private final Lock readLock;
+ private final Lock writeLock;
+ protected final AppContext appContext;
+ private final TaskHeartbeatHandler taskHeartbeatHandler;
+ private Credentials credentials;
+ protected Token<JobTokenIdentifier> jobToken;
+ private long launchTime = 0;
+ private long finishTime = 0;
+ private int shufflePort = -1;
+ private String trackerName;
+ private int httpPort;
+
+ // TODO Can these be replaced by the container object ?
+ private ContainerId containerId;
+ private NodeId containerNodeId;
+ private String nodeHttpAddress;
+ private String nodeRackName;
+
+ private TaskAttemptStatus reportedStatus;
+
+ protected final TaskLocationHint locationHint;
+ private final Resource taskResource;
+ private final Map<String, LocalResource> localResources;
+ private final Map<String, String> environment;
+
+ private final boolean isRescheduled;
+
+ private boolean speculatorContainerRequestSent = false;
+ protected String mrxModuleClassName;
+
+ protected static final FailedTransitionHelper FAILED_HELPER =
+ new FailedTransitionHelper();
+
+ protected static final KilledTransitionHelper KILLED_HELPER =
+ new KilledTransitionHelper();
+
+ private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION =
+ new DiagnosticInformationUpdater();
+
+ private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+ STATUS_UPDATER = new StatusUpdaterTransition();
+
+ private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
+ private static StateMachineFactory
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachineFactory
+ = new StateMachineFactory
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+ (TaskAttemptStateInternal.NEW)
+
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminateTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminateTransition(KILLED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedBeforeRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedBeforeRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, new OutputConsumableTransition()) //Optional, may not come in for all tasks.
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
+
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE) // Stuck RPC. The client retries in a loop.
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingAtOutputConsumableTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this. Something needs to go out to the job.
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING)
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+
+ // How will duplicate history events be handled ?
+ // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedAfterSuccessTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
+
+
+ .installTopology();
+
+
+ // TODO Remove TaskAttemptListener from the constructor.
+ @SuppressWarnings("rawtypes")
+ public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+ TaskAttemptListener tal, Path jobFile, int partition,
+ DAGConfiguration conf, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
+ TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
+ String mrxModuleClassName, TaskLocationHint locationHint,
+ Resource resource, Map<String, LocalResource> localResources,
+ Map<String, String> environment, boolean isRescheduled) {
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+ this.eventHandler = eventHandler;
+ //Reported status
+ this.jobFile = jobFile;
+ this.partition = partition;
+ this.conf = conf;
+ this.committer = committer;
+ this.jobToken = jobToken;
+ this.credentials = credentials;
+ this.clock = clock;
+ this.taskHeartbeatHandler = taskHeartbeatHandler;
+ this.appContext = appContext;
+ this.taskResource = resource;
+ this.reportedStatus = new TaskAttemptStatus();
+ this.mrxModuleClassName = mrxModuleClassName;
+ initTaskAttemptStatus(reportedStatus);
+ RackResolver.init(conf);
+ this.stateMachine = stateMachineFactory.make(this);
+ this.locationHint = locationHint;
+ this.localResources = localResources;
+ this.environment = environment;
+ this.isRescheduled = isRescheduled;
+ }
+
+
+ @Override
+ public TezTaskAttemptID getID() {
+ return attemptId;
+ }
+
+ TezTask createRemoteTask() {
+ Vertex vertex = getTask().getVertex();
+
+ // FIXME user and jobname
+ return new TezEngineTask(getID(), "user", "jobname", getTask()
+ .getVertex().getName(), mrxModuleClassName,
+ vertex.getInputSpecList(), vertex.getOutputSpecList());
+ }
+
+ @Override
+ public TaskAttemptReport getReport() {
+ TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
+ readLock.lock();
+ try {
+ result.setTaskAttemptId(attemptId);
+ //take the LOCAL state of attempt
+ //DO NOT take from reportedStatus
+
+ result.setTaskAttemptState(getState());
+ result.setProgress(reportedStatus.progress);
+ result.setStartTime(launchTime);
+ result.setFinishTime(finishTime);
+ result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
+ result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
+ //result.setPhase(reportedStatus.phase);
+ result.setStateString(reportedStatus.stateString);
+ result.setCounters(getCounters());
+ result.setContainerId(this.getAssignedContainerID());
+ result.setNodeManagerHost(trackerName);
+ result.setNodeManagerHttpPort(httpPort);
+ if (this.containerNodeId != null) {
+ result.setNodeManagerPort(this.containerNodeId.getPort());
+ }
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<String> getDiagnostics() {
+ List<String> result = new ArrayList<String>();
+ readLock.lock();
+ try {
+ result.addAll(diagnostics);
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TezCounters getCounters() {
+ readLock.lock();
+ try {
+ TezCounters counters = reportedStatus.counters;
+ if (counters == null) {
+ counters = EMPTY_COUNTERS;
+ }
+ return counters;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ readLock.lock();
+ try {
+ return reportedStatus.progress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskAttemptState getState() {
+ readLock.lock();
+ try {
+ return getExternalState(stateMachine.getCurrentState());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ readLock.lock();
+ try {
+ return (EnumSet.of(TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS)
+ .contains(getInternalState()));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ContainerId getAssignedContainerID() {
+ readLock.lock();
+ try {
+ return containerId;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ readLock.lock();
+ try {
+ return containerNodeId.toString();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ readLock.lock();
+ try {
+ return containerNodeId;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**If container Assigned then return the node's address, otherwise null.
+ */
+ @Override
+ public String getNodeHttpAddress() {
+ readLock.lock();
+ try {
+ return nodeHttpAddress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * If container Assigned then return the node's rackname, otherwise null.
+ */
+ @Override
+ public String getNodeRackName() {
+ this.readLock.lock();
+ try {
+ return this.nodeRackName;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getLaunchTime() {
+ readLock.lock();
+ try {
+ return launchTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getFinishTime() {
+ readLock.lock();
+ try {
+ return finishTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getInputReadyTime() {
+ readLock.lock();
+ try {
+ return this.reportedStatus.shuffleFinishTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getOutputReadyTime() {
+ readLock.lock();
+ try {
+ return this.reportedStatus.sortFinishTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public int getShufflePort() {
+ readLock.lock();
+ try {
+ return shufflePort;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Task getTask() {
+ return appContext.getDAG()
+ .getVertex(attemptId.getTaskID().getVertexID())
+ .getTask(attemptId.getTaskID());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing TaskAttemptEvent " + event.getTaskAttemptID()
+ + " of type " + event.getType());
+ }
+ LOG.info("DEBUG: Processing TaskAttemptEvent " + event.getTaskAttemptID()
+ + " of type " + event.getType() + " while in state "
+ + getInternalState() + ". Event: " + event);
+ writeLock.lock();
+ try {
+ final TaskAttemptStateInternal oldState = getInternalState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state for "
+ + this.attemptId, e);
+ eventHandler.handle(new DAGEventDiagnosticsUpdate(
+ this.attemptId.getTaskID().getVertexID().getDAGId(),
+ "Invalid event " + event.getType() +
+ " on TaskAttempt " + this.attemptId));
+ eventHandler.handle(
+ new DAGEvent(
+ this.attemptId.getTaskID().getVertexID().getDAGId(),
+ DAGEventType.INTERNAL_ERROR)
+ );
+ }
+ if (oldState != getInternalState()) {
+ LOG.info(attemptId + " TaskAttempt Transitioned from "
+ + oldState + " to "
+ + getInternalState());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public TaskAttemptStateInternal getInternalState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static TaskAttemptState getExternalState(
+ TaskAttemptStateInternal smState) {
+ switch (smState) {
+ case NEW:
+ case START_WAIT:
+ return TaskAttemptState.STARTING;
+ case RUNNING:
+ return TaskAttemptState.RUNNING;
+ case COMMIT_PENDING:
+ case OUTPUT_CONSUMABLE:
+ return TaskAttemptState.COMMIT_PENDING;
+ case FAILED:
+ case FAIL_IN_PROGRESS:
+ return TaskAttemptState.FAILED;
+ case KILLED:
+ case KILL_IN_PROGRESS:
+ return TaskAttemptState.KILLED;
+ case SUCCEEDED:
+ return TaskAttemptState.SUCCEEDED;
+ default:
+ throw new YarnException("Attempt to convert invalid "
+ + "stateMachineTaskAttemptState to externalTaskAttemptState: "
+ + smState);
+ }
+ }
+
+ @Override
+ public boolean getIsRescheduled() {
+ return isRescheduled;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ this.eventHandler.handle(event);
+ }
+
+ // always called in write lock
+ private void setFinishTime() {
+ // set the finish time only if launch time is set
+ if (launchTime != 0) {
+ finishTime = clock.getTime();
+ }
+ }
+
+ // TOOD Merge some of these JobCounter events.
+ private static DAGEventCounterUpdate createJobCounterUpdateEventTALaunched(
+ TaskAttemptImpl ta) {
+ DAGEventCounterUpdate jce =
+ new DAGEventCounterUpdate(
+ ta.getID().getTaskID().getVertexID().getDAGId()
+ );
+ jce.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1);
+ return jce;
+ }
+
+ private static DAGEventCounterUpdate createJobCounterUpdateEventSlotMillis(
+ TaskAttemptImpl ta) {
+ DAGEventCounterUpdate jce =
+ new DAGEventCounterUpdate(
+ ta.getID().getTaskID().getVertexID().getDAGId()
+ );
+
+ long slotMillis = computeSlotMillis(ta);
+ jce.addCounterUpdate(DAGCounter.SLOTS_MILLIS_TASKS, slotMillis);
+ return jce;
+ }
+
+ private static DAGEventCounterUpdate createJobCounterUpdateEventTATerminated(
+ TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted,
+ TaskAttemptStateInternal taState) {
+ DAGEventCounterUpdate jce =
+ new DAGEventCounterUpdate(
+ taskAttempt.getID().getTaskID().getVertexID().getDAGId());
+
+ long slotMillisIncrement = computeSlotMillis(taskAttempt);
+
+ if (taState == TaskAttemptStateInternal.FAILED) {
+ jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
+ } else if (taState == TaskAttemptStateInternal.KILLED) {
+ jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
+ }
+ if (!taskAlreadyCompleted) {
+ // dont double count the elapsed time
+ jce.addCounterUpdate(DAGCounter.SLOTS_MILLIS_TASKS, slotMillisIncrement);
+ }
+
+ return jce;
+ }
+
+ private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
+ int slotMemoryReq =
+ taskAttempt.taskResource.getMemory();
+
+ int minSlotMemSize =
+ taskAttempt.appContext.getClusterInfo().getMinContainerCapability()
+ .getMemory();
+
+ int simSlotsRequired =
+ minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
+ / minSlotMemSize);
+
+ long slotMillisIncrement =
+ simSlotsRequired
+ * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
+ return slotMillisIncrement;
+ }
+
+ // TODO: JobHistory
+ // TODO Change to return a JobHistoryEvent.
+ /*
+ private static
+ TaskAttemptUnsuccessfulCompletionEvent
+ createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
+ TaskAttemptStateInternal attemptState) {
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ new TaskAttemptUnsuccessfulCompletionEvent(
+ TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
+ .getTaskType()), attemptState.toString(),
+ taskAttempt.finishTime,
+ taskAttempt.containerNodeId == null ? "UNKNOWN"
+ : taskAttempt.containerNodeId.getHost(),
+ taskAttempt.containerNodeId == null ? -1
+ : taskAttempt.containerNodeId.getPort(),
+ taskAttempt.nodeRackName == null ? "UNKNOWN"
+ : taskAttempt.nodeRackName,
+ StringUtils.join(
+ LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
+ .getProgressSplitBlock().burst());
+ return tauce;
+ }
+
+ // TODO Incorporate MAPREDUCE-4838
+ private JobHistoryEvent createTaskAttemptStartedEvent() {
+ TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(
+ TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(taskId
+ .getTaskType()), launchTime, trackerName, httpPort, shufflePort,
+ containerId, "", "");
+ return new JobHistoryEvent(jobId, tase);
+
+ }
+ */
+
+ private WrappedProgressSplitsBlock getProgressSplitBlock() {
+ return null;
+ // TODO
+ /*
+ readLock.lock();
+ try {
+ if (progressSplitBlock == null) {
+ progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
+ MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
+ MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
+ }
+ return progressSplitBlock;
+ } finally {
+ readLock.unlock();
+ }
+ */
+ }
+
+ private void updateProgressSplits() {
+ double newProgress = reportedStatus.progress;
+ newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
+ TezCounters counters = reportedStatus.counters;
+ if (counters == null)
+ return;
+
+ WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
+ if (splitsBlock != null) {
+ long now = clock.getTime();
+ long start = getLaunchTime();
+
+ if (start == 0)
+ return;
+
+ if (start != 0 && now - start <= Integer.MAX_VALUE) {
+ splitsBlock.getProgressWallclockTime().extend(newProgress,
+ (int) (now - start));
+ }
+
+ TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
+ if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
+ splitsBlock.getProgressCPUTime().extend(newProgress,
+ (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
+ }
+
+ TezCounter virtualBytes = counters
+ .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
+ if (virtualBytes != null) {
+ splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
+ (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+ }
+
+ TezCounter physicalBytes = counters
+ .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
+ if (physicalBytes != null) {
+ splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
+ (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+ }
+ }
+ }
+
+ private void maybeSendSpeculatorContainerRequired() {
+ if (!speculatorContainerRequestSent) {
+ sendEvent(new SpeculatorEvent(getID().getTaskID(), +1));
+ speculatorContainerRequestSent = true;
+ }
+ }
+
+ private void maybeSendSpeculatorContainerNoLongerRequired() {
+ if (speculatorContainerRequestSent) {
+ sendEvent(new SpeculatorEvent(getID().getTaskID(), -1));
+ speculatorContainerRequestSent = false;
+ }
+ }
+
+ private void sendTaskAttemptCleanupEvent() {
+ TaskAttemptContext taContext =
+ new TaskAttemptContextImpl(this.conf,
+ TezMRTypeConverter.fromTez(this.attemptId));
+ sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
+ }
+
+ protected String[] resolveHosts(String[] src) {
+ return TaskAttemptImplHelpers.resolveHosts(src);
+ }
+
+ private void logJobHistoryAttemptStarted() {
+ // TODO JobHistory
+ /*
+ ta.sendEvent(ta.createTaskAttemptStartedEvent());
+ */
+ }
+
+ private void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) {
+ //Log finished events only if an attempt started.
+ if (getLaunchTime() == 0) return;
+
+ // TODO: JobHistory
+ /*
+ if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
+ MapAttemptFinishedEvent mfe =
+ new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.mapFinishTime,
+ finishTime,
+ this.containerNodeId == null ? "UNKNOWN"
+ : this.containerNodeId.getHost(),
+ this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst());
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
+ } else {
+ ReduceAttemptFinishedEvent rfe =
+ new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.shuffleFinishTime,
+ this.reportedStatus.sortFinishTime,
+ finishTime,
+ this.containerNodeId == null ? "UNKNOWN"
+ : this.containerNodeId.getHost(),
+ this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst());
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
+ }
+ */
+ }
+
+ private void logJobHistoryAttemptUnsuccesfulCompletion(
+ TaskAttemptStateInternal state) {
+ // TODO JobHistory
+ /*
+ ta.sendEvent(new JobHistoryEvent(ta.jobId,
+ createTaskAttemptUnsuccessfulCompletionEvent(ta,
+ state)));
+ */
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Start of Transition Classes //
+ //////////////////////////////////////////////////////////////////////////////
+
+ protected static class ScheduleTaskattemptTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
+ // Event to speculator - containerNeeded++
+ ta.maybeSendSpeculatorContainerRequired();
+
+ // TODO Creating the remote task here may not be required in case of
+ // recovery.
+
+ // Create the remote task.
+ TezTask remoteTaskContext = ta.createRemoteTask();
+ // Create startTaskRequest
+
+ String[] hostArray = new String[0];
+ String[] rackArray = new String[0];
+ if (!ta.isRescheduled) {
+ // Ask for node / rack locality.
+ Set<String> racks = new HashSet<String>();
+ if (ta.locationHint != null) {
+ if (ta.locationHint.getRacks() != null) {
+ racks.addAll(Arrays.asList(ta.locationHint.getRacks()));
+ }
+ if (ta.locationHint.getDataLocalHosts() != null) {
+ for (String host : ta.locationHint.getDataLocalHosts()) {
+ racks.add(RackResolver.resolve(host).getNetworkLocation());
+ }
+ hostArray = ta.resolveHosts(ta.locationHint.getDataLocalHosts());
+ }
+ }
+ rackArray = racks.toArray(new String[racks.size()]);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Asking for container launch with taskAttemptContext: "
+ + remoteTaskContext);
+ }
+ // Send out a launch request to the scheduler.
+ AMSchedulerEventTALaunchRequest launchRequestEvent =
+ new AMSchedulerEventTALaunchRequest(ta.attemptId,
+ ta.taskResource,
+ ta.localResources, remoteTaskContext, ta,
+ ta.credentials, ta.jobToken, hostArray,
+ rackArray,
+ scheduleEvent.getPriority(), ta.environment);
+ ta.sendEvent(launchRequestEvent);
+ }
+ }
+
+ protected static class DiagnosticInformationUpdater implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptEventDiagnosticsUpdate diagEvent = (TaskAttemptEventDiagnosticsUpdate) event;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Diagnostics update for " + ta.attemptId + ": "
+ + diagEvent.getDiagnosticInfo());
+ }
+ ta.addDiagnosticInfo(diagEvent.getDiagnosticInfo());
+ }
+ }
+
+ protected static class TerminateTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ TerminatedTransitionHelper helper;
+
+ public TerminateTransition(TerminatedTransitionHelper helper) {
+ this.helper = helper;
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ ta.setFinishTime();
+
+ if (event instanceof DiagnosableEvent) {
+ ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
+ }
+
+ ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
+ helper.getTaskAttemptStateInternal()));
+ if (ta.getLaunchTime() != 0) {
+ // TODO For cases like this, recovery goes for a toss, since the the
+ // attempt will not exist in the history file.
+ ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
+ .getTaskAttemptStateInternal());
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not generating HistoryFinish event since start event not "
+ + "generated for taskAttempt: " + ta.getID());
+ }
+ }
+ // Send out events to the Task - indicating TaskAttemptTermination(F/K)
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
+ .getTaskEventType()));
+
+ if (event instanceof DiagnosableEvent) {
+ ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
+ }
+ }
+ }
+
+ protected static class StartedTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
+ TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent;
+
+ Container container = ta.appContext.getAllContainers()
+ .get(event.getContainerId()).getContainer();
+
+ ta.containerId = event.getContainerId();
+ ta.containerNodeId = container.getNodeId();
+ ta.nodeHttpAddress = container.getNodeHttpAddress();
+ ta.nodeRackName = RackResolver.resolve(ta.containerNodeId.getHost())
+ .getNetworkLocation();
+
+ ta.launchTime = ta.clock.getTime();
+ ta.shufflePort = event.getShufflePort();
+
+ // TODO Resolve to host / IP in case of a local address.
+ InetSocketAddress nodeHttpInetAddr = NetUtils
+ .createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
+ ta.trackerName = nodeHttpInetAddr.getHostName();
+ ta.httpPort = nodeHttpInetAddr.getPort();
+ ta.sendEvent(createJobCounterUpdateEventTALaunched(ta));
+
+ LOG.info("TaskAttempt: [" + ta.attemptId + "] started."
+ + " Is using containerId: [" + ta.containerId + "]" + " on NM: ["
+ + ta.containerNodeId + "]");
+
+ // JobHistoryEvent
+ ta.logJobHistoryAttemptStarted();
+
+ // Inform the speculator about the container assignment.
+ ta.maybeSendSpeculatorContainerNoLongerRequired();
+ // Inform speculator about startTime
+ ta.sendEvent(new SpeculatorEvent(ta.attemptId, true, ta.launchTime));
+
+ // Inform the Task
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+
+ ta.taskHeartbeatHandler.register(ta.attemptId);
+ }
+ }
+
+ protected static class TerminatedBeforeRunningTransition extends
+ TerminateTransition {
+
+ public TerminatedBeforeRunningTransition(
+ TerminatedTransitionHelper helper) {
+ super(helper);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ // Inform the scheduler
+ ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
+ .getTaskAttemptState()));
+ // Decrement speculator container request.
+ ta.maybeSendSpeculatorContainerNoLongerRequired();
+ }
+ }
+
+ protected static class NodeFailedBeforeRunningTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public NodeFailedBeforeRunningTransition() {
+ super(KILLED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+ ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
+ }
+ }
+
+ protected static class ContainerTerminatingBeforeRunningTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public ContainerTerminatingBeforeRunningTransition() {
+ super(FAILED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ TaskAttemptEventContainerTerminating tEvent = (TaskAttemptEventContainerTerminating) event;
+ ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+ }
+ }
+
+ protected static class ContainerCompletedBeforeRunningTransition extends
+ TerminatedBeforeRunningTransition {
+ public ContainerCompletedBeforeRunningTransition() {
+ super(FAILED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.sendTaskAttemptCleanupEvent();
+
+ TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
+ ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+ }
+
+ }
+
+ protected static class StatusUpdaterTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptStatus newReportedStatus = ((TaskAttemptEventStatusUpdate) event)
+ .getReportedTaskAttemptStatus();
+ ta.reportedStatus = newReportedStatus;
+ ta.reportedStatus.taskState = ta.getState();
+
+ // Inform speculator of status.
+ ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
+
+ ta.updateProgressSplits();
+
+ // Inform the job about fetch failures if they exist.
+ if (ta.reportedStatus.fetchFailedMaps != null
+ && ta.reportedStatus.fetchFailedMaps.size() > 0) {
+ ta.sendEvent(new VertexEventTaskAttemptFetchFailure(ta.attemptId,
+ ta.reportedStatus.fetchFailedMaps));
+ }
+ // TODO at some point. Nodes may be interested in FetchFailure info.
+ // Can be used to blacklist nodes.
+ }
+ }
+
+ protected static class OutputConsumableTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ TaskAttemptEventOutputConsumable orEvent = (TaskAttemptEventOutputConsumable) event;
+ ta.shufflePort = orEvent.getOutputContext().getShufflePort();
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE));
+ }
+ }
+
+ protected static class CommitPendingTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+ }
+ }
+
+ protected static class SucceededTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+
+ ta.setFinishTime();
+ // Inform the speculator.
+ ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
+ // Send out history event.
+ ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
+ ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
+
+ // Inform the Scheduler.
+ ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
+ TaskAttemptState.SUCCEEDED));
+
+ // Inform the task.
+ ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ // Unregister from the TaskHeartbeatHandler.
+ ta.taskHeartbeatHandler.unregister(ta.attemptId);
+
+ // TODO maybe. For reuse ... Stacking pulls for a reduce task, even if the
+ // TA finishes independently. // Will likely be the Job's responsibility.
+
+ }
+ }
+
+ protected static class TerminatedWhileRunningTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public TerminatedWhileRunningTransition(
+ TerminatedTransitionHelper helper) {
+ super(helper);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.taskHeartbeatHandler.unregister(ta.attemptId);
+ }
+ }
+
+ protected static class ContainerCompletedWhileRunningTransition extends
+ TerminatedBeforeRunningTransition {
+ public ContainerCompletedWhileRunningTransition() {
+ super(FAILED_HELPER);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.sendTaskAttemptCleanupEvent();
+ }
+ }
+
+ protected static class CommitPendingAtOutputConsumableTransition extends
+ CommitPendingTransition {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ // TODO Figure out the interaction between OUTPUT_CONSUMABLE AND
+ // COMMIT_PENDING, Ideally both should not exist for the same task.
+ super.transition(ta, event);
+ LOG.info("Received a commit pending while in the OutputConsumable state");
+ }
+ }
+
+ protected static class ContainerCompletedWhileTerminating implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ ta.sendTaskAttemptCleanupEvent();
+ TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
+ ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
+ }
+
+ }
+
+ protected static class TerminatedAfterSuccessTransition extends
+ TerminatedBeforeRunningTransition {
+
+ public TerminatedAfterSuccessTransition(TerminatedTransitionHelper helper) {
+ super(helper);
+ }
+
+ @Override
+ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ super.transition(ta, event);
+ ta.sendTaskAttemptCleanupEvent();
+ }
+
+ }
+
+ private void initTaskAttemptStatus(TaskAttemptStatus result) {
+ result.progress = 0.0f;
+ // result.phase = Phase.STARTING;
+ result.stateString = "NEW";
+ result.taskState = TaskAttemptState.NEW;
+ TezCounters counters = EMPTY_COUNTERS;
+ // counters.groups = new HashMap<String, CounterGroup>();
+ result.counters = counters;
+ }
+
+ private void addDiagnosticInfo(String diag) {
+ if (diag != null && !diag.equals("")) {
+ diagnostics.add(diag);
+ }
+ }
+
+ protected interface TerminatedTransitionHelper {
+
+ public TaskAttemptStateInternal getTaskAttemptStateInternal();
+
+ public TaskAttemptState getTaskAttemptState();
+
+ public TaskEventType getTaskEventType();
+ }
+
+ protected static class FailedTransitionHelper implements
+ TerminatedTransitionHelper {
+ public TaskAttemptStateInternal getTaskAttemptStateInternal() {
+ return TaskAttemptStateInternal.FAILED;
+ }
+
+ @Override
+ public TaskAttemptState getTaskAttemptState() {
+ return TaskAttemptState.FAILED;
+ }
+
+ @Override
+ public TaskEventType getTaskEventType() {
+ return TaskEventType.T_ATTEMPT_FAILED;
+ }
+ }
+
+ protected static class KilledTransitionHelper implements
+ TerminatedTransitionHelper {
+
+ @Override
+ public TaskAttemptStateInternal getTaskAttemptStateInternal() {
+ return TaskAttemptStateInternal.KILLED;
+ }
+
+ @Override
+ public TaskAttemptState getTaskAttemptState() {
+ return TaskAttemptState.KILLED;
+ }
+
+ @Override
+ public TaskEventType getTaskEventType() {
+ return TaskEventType.T_ATTEMPT_KILLED;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getID().toString();
+ }
+
+ @Override
+ public Map<String, LocalResource> getLocalResources() {
+ return this.localResources;
+ }
+
+ @Override
+ public Map<String, String> getEnvironment() {
+ return this.environment;
+ }
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TaskAttemptImplHelpers {
+
+ private static final Log LOG = LogFactory.getLog(TaskAttemptImplHelpers.class);
+
+ static String[] resolveHosts(String[] src) {
+ String[] result = new String[src.length];
+ for (int i = 0; i < src.length; i++) {
+ if (isIP(src[i])) {
+ result[i] = resolveHost(src[i]);
+ } else {
+ result[i] = src[i];
+ }
+ }
+ return result;
+ }
+
+ static String resolveHost(String src) {
+ String result = src; // Fallback in case of failure.
+ try {
+ InetAddress addr = InetAddress.getByName(src);
+ result = addr.getHostName();
+ } catch (UnknownHostException e) {
+ LOG.warn("Failed to resolve address: " + src
+ + ". Continuing to use the same.");
+ }
+ return result;
+ }
+
+ private static final Pattern ipPattern = // Pattern for matching ip
+ Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}");
+
+ static boolean isIP(String src) {
+ return ipPattern.matcher(src).matches();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java
------------------------------------------------------------------------------
svn:eol-style = native