You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [18/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+public enum AMContainerEventType {
+
+ //Producer: Scheduler
+ C_LAUNCH_REQUEST,
+ C_ASSIGN_TA,
+
+ //Producer: NMCommunicator
+ C_LAUNCHED,
+ C_LAUNCH_FAILED,
+
+ //Producer: TAL: PULL_TA is a sync call.
+ C_PULL_TA,
+
+ //Producer: Scheduler via TA
+ C_TA_SUCCEEDED, // maybe change this to C_TA_FINISHED with a status.
+
+ //Producer: RMCommunicator
+ C_COMPLETED,
+
+ //Producer: RMCommunicator, AMNode
+ C_NODE_FAILED,
+
+ //TODO ZZZ CREUSE: Consider introducing a new event C_NODE_BLACKLISTED -> container can take a call on what to do if this event comes in.
+
+ //Producer: TA-> Scheduler -> Container (in case of failure etc)
+ // Scheduler -> Container (in case of pre-emption etc)
+ // Node -> Container (in case of Node blacklisted etc)
+ C_STOP_REQUEST,
+
+ //Producer: NMCommunicator
+ C_NM_STOP_FAILED,
+ C_NM_STOP_SENT,
+
+ //Producer: ContainerHeartbeatHandler
+ C_TIMED_OUT,
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,205 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceChildJVM2;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AMContainerHelpers {
+
+ private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
+
+ private static Object commonContainerSpecLock = new Object();
+ private static ContainerLaunchContext commonContainerSpec = null;
+
+ /**
+ * Create a {@link LocalResource} record with all the given parameters.
+ */
+ public static LocalResource createLocalResource(FileSystem fc, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility)
+ throws IOException {
+ FileStatus fstat = fc.getFileStatus(file);
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+ .getPath()));
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+ resourceSize, resourceModificationTime);
+ }
+
+ /**
+ * Create the common {@link ContainerLaunchContext} for all attempts.
+ *
+ * @param applicationACLs
+ */
+ private static ContainerLaunchContext createCommonContainerLaunchContext(
+ Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+ Token<JobTokenIdentifier> jobToken,
+ ApplicationId appId, TezVertexID vertexId, Credentials credentials) {
+
+ // Application resources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ // Application environment
+ Map<String, String> environment = new HashMap<String, String>();
+
+ // Service data
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+ // Tokens
+ ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
+ try {
+ // Setup up task credentials buffer
+ Credentials taskCredentials = new Credentials();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+ + credentials.numberOfSecretKeys()
+ + " secret keys for NM use for launching container");
+ taskCredentials.addAll(credentials);
+ }
+
+ // LocalStorageToken is needed irrespective of whether security is enabled
+ // or not.
+ TokenCache.setJobToken(jobToken, taskCredentials);
+
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ LOG.info("Size of containertokens_dob is "
+ + taskCredentials.numberOfTokens());
+ taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+ taskCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+ containerTokens_dob.getLength());
+
+ // Add shuffle token
+ LOG.info("Putting shuffle token in serviceData");
+ serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+ ShuffleHandler.serializeServiceData(jobToken));
+
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ // Construct the actual Container
+ // The null fields are per-container and will be constructed for each
+ // container separately.
+ ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+ conf.get(MRJobConfig.USER_NAME), localResources,
+ environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
+
+ return container;
+ }
+
+ // FIXME does CLC need to work based off DAG id or App Id?
+ @VisibleForTesting
+ public static ContainerLaunchContext createContainerLaunchContext(
+ Map<ApplicationAccessType, String> applicationACLs,
+ ContainerId containerId, JobConf jobConf, TezVertexID vertexId,
+ Token<JobTokenIdentifier> jobToken,
+ Resource assignedCapability, Map<String, LocalResource> localResources,
+ Map<String, String> vertexEnv,
+ TaskAttemptListener taskAttemptListener, Credentials credentials,
+ boolean shouldProfile) {
+
+ synchronized (commonContainerSpecLock) {
+ if (commonContainerSpec == null) {
+ commonContainerSpec = createCommonContainerLaunchContext(
+ applicationACLs, jobConf, jobToken,
+ vertexId.getDAGId().getApplicationId(),
+ vertexId, credentials);
+ }
+ }
+
+ // Fill in the fields needed per-container that are missing in the common
+ // spec.
+ Map<String, LocalResource> lResources =
+ new TreeMap<String, LocalResource>();
+ lResources.putAll(commonContainerSpec.getLocalResources());
+ lResources.putAll(localResources);
+
+ // Setup environment by cloning from common env.
+ // FIXME common env is empty
+ // MRChildJVM2.setEnv should become a no-op
+ Map<String, String> env = commonContainerSpec.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+ myEnv.putAll(vertexEnv);
+ MapReduceChildJVM2.setVMEnv(myEnv, jobConf, vertexId);
+
+ // Set up the launch command
+ List<String> commands = MapReduceChildJVM2.getVMCommand(
+ taskAttemptListener.getAddress(), jobConf, vertexId, containerId,
+ vertexId.getDAGId().getApplicationId(), shouldProfile);
+
+ // Duplicate the ByteBuffers for access by multiple containers.
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData()
+ .entrySet()) {
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+ }
+
+ // Construct the actual Container
+ ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+ commonContainerSpec.getUser(), lResources, myEnv, commands,
+ myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+ applicationACLs);
+
+ return container;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,964 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
+import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+@SuppressWarnings("rawtypes")
+public class AMContainerImpl implements AMContainer {
+
+ private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
+
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+ private final ContainerId containerId;
+ // Container to be used for getters on capability, locality etc.
+ private final Container container;
+ private final AppContext appContext;
+ private final ContainerHeartbeatHandler containerHeartbeatHandler;
+ private final TaskAttemptListener taskAttemptListener;
+ protected final EventHandler eventHandler;
+
+ private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
+
+ // TODO Maybe this should be pulled from the TaskAttempt.s
+ private final Map<TezTaskAttemptID, TezTask> remoteTaskMap =
+ new HashMap<TezTaskAttemptID, TezTask>();
+
+ // TODO ?? Convert to list and hash.
+
+ private int shufflePort;
+ private long idleTimeBetweenTasks = 0;
+ private long lastTaskFinishTime;
+
+ // An assign can happen even during wind down. e.g. NodeFailure caused the
+ // wind down, and an allocation was pending in the AMScheduler. This could
+ // be modelled as a separate state.
+ private boolean nodeFailed = false;
+ private String nodeFailedMessage;
+
+ private TezTaskAttemptID pendingAttempt;
+ private TezTaskAttemptID runningAttempt;
+ private List<TezTaskAttemptID> failedAssignments;
+ private TezTaskAttemptID pullAttempt;
+
+ private AMContainerTask noAllocationContainerTask;
+
+ private static final AMContainerTask NO_MORE_TASKS = new AMContainerTask(
+ true, null);
+ private static final AMContainerTask WAIT_TASK = new AMContainerTask(false,
+ null);
+
+ private boolean inError = false;
+
+ private ContainerLaunchContext clc;
+
+ // TODO Consider registering with the TAL, instead of the TAL pulling.
+ // Possibly after splitting TAL and ContainerListener.
+
+ // TODO What should be done with pendingAttempts. Nullify when handled ?
+ // Add them to failed ta list ? Some historic information should be maintained.
+
+ // TODO Create a generic ERROR state. Container tries informing relevant components in this case.
+
+
+ private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
+ private static final StateMachineFactory
+ <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>
+ stateMachineFactory =
+ new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
+ AMContainerState.ALLOCATED)
+
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
+
+ .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
+ // TODO CREUSE : Maybe, consider sending back an attempt if the container asks for one in this state. Waiting for a LAUNCHED event from the NMComm may delay the task allocation.
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorAtLaunchingTransition())
+
+ .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
+
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
+ .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
+
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, new NMStopRequestFailedTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
+
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
+
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+
+ .installTopology();
+
+ // Note: Containers will not reach their final state if the RM link is broken,
+ // AM shutdown should not wait for this.
+
+ // Attempting to use a container based purely on reosurces required, etc needs
+ // additional change - JvmID, YarnChild, etc depend on TaskType.
+ public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
+ TaskAttemptListener tal, AppContext appContext) {
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ this.container = container;
+ this.containerId = container.getId();
+ this.eventHandler = appContext.getEventHandler();
+ this.appContext = appContext;
+ this.containerHeartbeatHandler = chh;
+ this.taskAttemptListener = tal;
+ this.failedAssignments = new LinkedList<TezTaskAttemptID>();
+
+ this.noAllocationContainerTask = WAIT_TASK;
+ this.stateMachine = stateMachineFactory.make(this);
+ }
+
+ @Override
+ public AMContainerState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ @Override
+ public Container getContainer() {
+ return this.container;
+ }
+
+ @Override
+ public List<TezTaskAttemptID> getCompletedTaskAttempts() {
+ readLock.lock();
+ try {
+ return new ArrayList<TezTaskAttemptID>(this.completedAttempts);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<TezTaskAttemptID> getQueuedTaskAttempts() {
+ readLock.lock();
+ try {
+ return Collections.singletonList(this.pendingAttempt);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TezTaskAttemptID getRunningTaskAttempt() {
+ readLock.lock();
+ try {
+ return this.runningAttempt;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public int getShufflePort() {
+ readLock.lock();
+ try {
+ return this.shufflePort;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void handle(AMContainerEvent event) {
+ this.writeLock.lock();
+ LOG.info("DEBUG: Processing AMContainerEvent " + event.getContainerId()
+ + " of type " + event.getType() + " while in state: " + getState()
+ + ". Event: " + event);
+ try {
+ final AMContainerState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle event " + event.getType()
+ + " at current state " + oldState + " for ContainerId "
+ + this.containerId, e);
+ inError = true;
+ // TODO Can't set state to COMPLETED. Add a default error state.
+ }
+ if (oldState != getState()) {
+ LOG.info("AMContainer " + this.containerId + " transitioned from "
+ + oldState + " to " + getState());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ this.eventHandler.handle(event);
+ }
+
+ // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks
+ // for a TaskAttempt.
+ public AMContainerTask pullTaskContext() {
+ this.writeLock.lock();
+ try {
+ this.handle(
+ new AMContainerEvent(containerId, AMContainerEventType.C_PULL_TA));
+ if (pullAttempt == null) {
+ return noAllocationContainerTask;
+ } else {
+ return new AMContainerTask(false, remoteTaskMap.remove(pullAttempt));
+ }
+ } finally {
+ this.pullAttempt = null;
+ this.writeLock.unlock();
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Start of Transition Classes //
+ //////////////////////////////////////////////////////////////////////////////
+
+ protected static class LaunchRequestTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
+
+ JobConf jobConf = new JobConf(event.getConf());
+
+ container.clc = AMContainerHelpers.createContainerLaunchContext(
+ container.appContext.getApplicationACLs(),
+ container.getContainerId(), jobConf,
+ event.getVertexId(),
+ event.getJobToken(),
+ container.getContainer().getResource(),
+ event.getLocalResources(),
+ event.getEnvironment(),
+ container.taskAttemptListener, event.getCredentials(),
+ event.shouldProfile());
+
+ container.registerWithTAListener();
+ container.sendStartRequestToNM();
+ LOG.info("Sending Launch Request for Container with id: " +
+ container.container.getId());
+ // Forget about the clc to save resources. At some point, part of the clc
+ // info may need to be exposed to the scheduler to figure out whether a
+ // container can be used for a specific TaskAttempt.
+ container.clc = null;
+ }
+ }
+
+ protected static class AssignTaskAttemptAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ container.inError = true;
+ container.maybeSendNodeFailureForFailedAssignment(event
+ .getTaskAttemptId());
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ "AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
+ container.getContainerId());
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
+ " for ContainerId: " + container.getContainerId() +
+ " while in state: " + container.getState());
+ }
+ }
+
+ protected static class CompletedAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventCompleted event = (AMContainerEventCompleted)cEvent;
+ container.sendCompletedToScheduler();
+ container.sendDiagUpdateOnContainerComplete(event);
+ String diag = event.getContainerStatus().getDiagnostics();
+ if (!(diag == null || diag.equals(""))) {
+ LOG.info("Container " + container.getContainerId()
+ + " exited with diagnostics set to " + diag);
+ }
+ }
+ }
+
+ protected static class StopRequestAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.nodeFailed = true;
+ if (cEvent instanceof DiagnosableEvent) {
+ container.nodeFailedMessage = ((DiagnosableEvent) cEvent)
+ .getDiagnosticInfo();
+ }
+ // TODO why are these sent. no need to send these now.
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ }
+ }
+
+ protected static class NodeFailedAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorTransition extends ErrorBaseTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ LOG.info(
+ "Unexpected event type: " + cEvent.getType() + " while in state: " +
+ container.getState() + ". Event: " + cEvent);
+
+ }
+ }
+
+ protected static class AssignTaskAttemptTransition implements
+ MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+ @Override
+ public AMContainerState transition(
+ AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ if (container.pendingAttempt != null) {
+ // This may include a couple of additional (harmless) unregister calls
+ // to the taskAttemptListener and containerHeartbeatHandler - in case
+ // of assign at any state prior to IDLE.
+ container.handleExtraTAAssign(event, container.pendingAttempt);
+ // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
+ // NMCommunicator should be able to handle this. The STOP_REQUEST would
+ // only go out after the START_REQUEST.
+ return AMContainerState.STOP_REQUESTED;
+ }
+ container.pendingAttempt = event.getTaskAttemptId();
+ LOG.info("DEBUG: AssignTA: attempt: " + event.getRemoteTaskContext());
+ container.remoteTaskMap
+ .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
+ return container.getState();
+ }
+ }
+
+ protected static class LaunchedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventLaunched event = (AMContainerEventLaunched) cEvent;
+ container.shufflePort = event.getShufflePort();
+ container.registerWithContainerListener();
+ }
+ }
+
+ protected static class LaunchFailedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
+ container.sendTerminatingToTA(container.pendingAttempt,
+ event.getMessage());
+ }
+ container.unregisterFromTAListener();
+ container.deAllocate();
+ }
+ }
+
+ protected static class CompletedAtLaunchingTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
+ if (container.pendingAttempt != null) {
+ String errorMessage = getMessage(container, event);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+ errorMessage);
+ LOG.warn(errorMessage);
+ }
+ container.sendDiagUpdateOnContainerComplete(event);
+ container.unregisterFromTAListener();
+ container.sendCompletedToScheduler();
+ String diag = event.getContainerStatus().getDiagnostics();
+ if (!(diag == null || diag.equals(""))) {
+ LOG.info("Container " + container.getContainerId()
+ + " exited with diagnostics set to " + diag);
+ }
+ }
+
+ public String getMessage(AMContainerImpl container,
+ AMContainerEventCompleted event) {
+ return "Container" + container.getContainerId()
+ + " COMPLETED while trying to launch. Diagnostics: ["
+ + event.getContainerStatus().getDiagnostics() +"]";
+ }
+ }
+
+ protected static class StopRequestAtLaunchingTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ container.sendTerminatingToTA(container.pendingAttempt,
+ getMessage(container, cEvent));
+ }
+ container.unregisterFromTAListener();
+ container.sendStopRequestToNM();
+ }
+
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container " + container.getContainerId() +
+ " received a STOP_REQUEST";
+ }
+ }
+
+ protected static class NodeFailedBaseTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+ container.nodeFailed = true;
+ String errorMessage = null;
+ if (cEvent instanceof DiagnosableEvent) {
+ errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+ }
+
+ for (TezTaskAttemptID taId : container.failedAssignments) {
+ container.sendNodeFailureToTA(taId, errorMessage);
+ }
+ for (TezTaskAttemptID taId : container.completedAttempts) {
+ // TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs.
+ container.sendNodeFailureToTA(taId, errorMessage);
+ }
+
+ if (container.pendingAttempt != null) {
+ container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
+ container.sendTerminatingToTA(container.pendingAttempt, "Node failure");
+ }
+ if (container.runningAttempt != null) {
+ container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
+ container.sendTerminatingToTA(container.runningAttempt, "Node failure");
+ }
+ }
+ }
+
+ protected static class NodeFailedAtLaunchingTransition
+ extends NodeFailedBaseTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromTAListener();
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorAtLaunchingTransition
+ extends ErrorTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ if (container.pendingAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+ "Container " + container.getContainerId() +
+ " hit an invalid transition - " + cEvent.getType() + " at " +
+ container.getState());
+ }
+ container.unregisterFromTAListener();
+ }
+ }
+
+ protected static class AssignTaskAttemptAtIdleTransition
+ extends AssignTaskAttemptTransition {
+ @Override
+ public AMContainerState transition(
+ AMContainerImpl container, AMContainerEvent cEvent) {
+ LOG.info("DEBUG: AssignTAAtIdle: attempt: " +
+ ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+ return super.transition(container, cEvent);
+ }
+ }
+
+ protected static class PullTAAtIdleTransition implements
+ MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+ @Override
+ public AMContainerState transition(
+ AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ // This will be invoked as part of the PULL_REQUEST - so pullAttempt pullAttempt
+ // should ideally only end up being populated during the duration of this call,
+ // which is in a write lock. pullRequest() should move this to the running state.
+ container.pullAttempt = container.pendingAttempt;
+ container.runningAttempt = container.pendingAttempt;
+ container.pendingAttempt = null;
+ if (container.lastTaskFinishTime != 0) {
+ long idleTimeDiff =
+ System.currentTimeMillis() - container.lastTaskFinishTime;
+ LOG.info("DEBUG: Computing idle time for container: " +
+ container.getContainerId() + ", lastFinishTime: " +
+ container.lastTaskFinishTime + ", Incremented by: " +
+ idleTimeDiff);
+ container.idleTimeBetweenTasks +=
+ System.currentTimeMillis() - container.lastTaskFinishTime;
+ }
+ LOG.info("Assigned taskAttempt + [" + container.runningAttempt +
+ "] to container: [" + container.getContainerId() + "]");
+ return AMContainerState.RUNNING;
+ } else {
+ return AMContainerState.IDLE;
+ }
+ }
+ }
+
+ protected static class CompletedAtIdleTransition
+ extends CompletedAtLaunchingTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromContainerListener();
+ container.sendDiagUpdateOnContainerComplete((AMContainerEventCompleted)cEvent);
+ }
+
+ @Override
+ public String getMessage(
+ AMContainerImpl container, AMContainerEventCompleted event) {
+ return "Container " + container.getContainerId() + " COMPLETED"
+ + " with diagnostics set to ["
+ + event.getContainerStatus().getDiagnostics() + "]";
+ }
+ }
+
+ protected static class StopRequestAtIdleTransition
+ extends StopRequestAtLaunchingTransition {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ LOG.info("DEBUG: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
+ container.unregisterFromContainerListener();
+ }
+ }
+
+ protected static class TimedOutAtIdleTransition
+ extends StopRequestAtIdleTransition {
+
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container " + container.getContainerId() +
+ " timed out";
+ }
+ }
+
+ protected static class NodeFailedAtIdleTransition
+ extends NodeFailedAtLaunchingTransition {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromContainerListener();
+ }
+ }
+
+ protected static class ErrorAtIdleTransition
+ extends ErrorAtLaunchingTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromContainerListener();
+ }
+ }
+
+ protected static class AssignTaskAttemptAtRunningTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.handleExtraTAAssign(event, container.runningAttempt);
+ }
+ }
+
+ protected static class TASucceededAtRunningTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.lastTaskFinishTime = System.currentTimeMillis();
+ container.completedAttempts.add(container.runningAttempt);
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.runningAttempt = null;
+ }
+ }
+
+ protected static class CompletedAtRunningTransition
+ extends CompletedAtIdleTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
+ container.sendTerminatedToTaskAttempt(container.runningAttempt,
+ getMessage(container, event));
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ super.transition(container, cEvent);
+ }
+ }
+
+ protected static class StopRequestAtRunningTransition
+ extends StopRequestAtIdleTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.sendTerminatingToTA(container.runningAttempt,
+ " Container" + container.getContainerId() +
+ " received a STOP_REQUEST");
+ super.transition(container, cEvent);
+ }
+ }
+
+ protected static class TimedOutAtRunningTransition
+ extends StopRequestAtRunningTransition {
+ @Override
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container " + container.getContainerId() +
+ " timed out";
+ }
+ }
+
+ protected static class NodeFailedAtRunningTransition
+ extends NodeFailedAtIdleTransition {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ }
+ }
+
+ protected static class ErrorAtRunningTransition
+ extends ErrorAtIdleTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt,
+ "Container " + container.getContainerId() +
+ " hit an invalid transition - " + cEvent.getType() + " at " +
+ container.getState());
+ }
+ }
+
+ protected static class AssignTAAtWindDownTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ container.inError = true;
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId() +
+ " cannot be allocated to container: " + container.getContainerId() +
+ " in " + container.getState() + " state";
+ container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+ container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
+ }
+ }
+
+ // Hack to some extent. This allocation should be done while entering one of
+ // the post-running states, insetad of being a transition on the post stop
+ // states.
+ protected static class PullTAAfterStopTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.noAllocationContainerTask = NO_MORE_TASKS;
+ }
+ }
+
+ protected static class CompletedAtWindDownTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
+ String diag = event.getContainerStatus().getDiagnostics();
+ if (container.pendingAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+ }
+ if (container.runningAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
+ }
+ for (TezTaskAttemptID taId : container.failedAssignments) {
+ container.sendTerminatedToTaskAttempt(taId, diag);
+ }
+ if (!(diag == null || diag.equals(""))) {
+ LOG.info("Container " + container.getContainerId()
+ + " exited with diagnostics set to " + diag);
+ }
+ container.sendCompletedToScheduler();
+ }
+ }
+
+ protected static class NMStopRequestFailedTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.deAllocate();
+ }
+ }
+
+ protected static class NodeFailedAtNMStopRequestedTransition
+ extends NodeFailedBaseTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorAtNMStopRequestedTransition
+ extends ErrorAtStoppingTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorAtStoppingTransition
+ extends ErrorBaseTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ if (container.pendingAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+ }
+ if (container.runningAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+ }
+ for (TezTaskAttemptID taId : container.failedAssignments) {
+ container.sendTerminatedToTaskAttempt(taId, null);
+ }
+ container.sendCompletedToScheduler();
+ }
+ }
+
+ protected static class ErrorBaseTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.inError = true;
+ }
+ }
+
+ protected static class AssignTAAtCompletedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ // TODO CREUSE CRITICAL: This is completely incorrect. COMPLETED comes
+ // from RMComm directly to the container. Meanwhile, the scheduler may
+ // think the container is still around and assign a task to it. The task
+ // ends up getting a CONTAINER_KILLED message. Task could handle this by
+ // asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it.
+ container.inError = true;
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ + " cannot be allocated to container: " + container.getContainerId()
+ + " in COMPLETED state";
+ container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
+ }
+ }
+
+
+ private void handleExtraTAAssign(
+ AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
+ this.inError = true;
+ String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+ "taskAttempt allocations to: " + this.getContainerId() +
+ ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
+ ". Current state: " + this.getState();
+ this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+ this.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ this.sendTerminatingToTA(currentTaId, errorMessage);
+ this.registerFailedTAAssignment(event.getTaskAttemptId());
+ LOG.warn(errorMessage);
+ this.sendStopRequestToNM();
+ this.unregisterFromTAListener();
+ this.unregisterFromContainerListener();
+ }
+
+
+ protected void registerFailedTAAssignment(TezTaskAttemptID taId) {
+ failedAssignments.add(taId);
+ }
+
+ protected void deAllocate() {
+ sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
+ }
+
+ protected void sendCompletedToScheduler() {
+ sendEvent(new AMSchedulerEventContainerCompleted(containerId));
+ }
+
+ protected void sendDiagUpdateOnContainerComplete(
+ AMContainerEventCompleted cEvent) {
+ String diag = cEvent.getContainerStatus().getDiagnostics();
+ if (pendingAttempt != null) {
+ sendEvent(new TaskAttemptEventDiagnosticsUpdate(pendingAttempt, diag));
+ }
+ if (runningAttempt != null) {
+ sendEvent(new TaskAttemptEventDiagnosticsUpdate(runningAttempt, diag));
+ }
+ }
+
+ protected void sendTerminatedToTaskAttempt(
+ TezTaskAttemptID taId, String message) {
+ sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+ }
+
+ protected void sendTerminatingToTA(TezTaskAttemptID taId, String message) {
+ sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+ }
+
+ protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID taId) {
+ if (this.nodeFailed) {
+ this.sendNodeFailureToTA(taId, nodeFailedMessage);
+ }
+ }
+
+ protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
+ sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+ }
+
+ protected void sendStartRequestToNM() {
+ sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
+ }
+
+ protected void sendStopRequestToNM() {
+ sendEvent(new NMCommunicatorStopRequestEvent(containerId,
+ container.getNodeId(), container.getContainerToken()));
+ }
+
+ protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
+ taskAttemptListener.unregisterTaskAttempt(attemptId);
+ }
+
+ protected void registerWithTAListener() {
+ taskAttemptListener.registerRunningContainer(containerId);
+ }
+
+ protected void unregisterFromTAListener() {
+ this.taskAttemptListener.unregisterRunningContainer(containerId);
+ }
+
+
+ protected void registerWithContainerListener() {
+ this.containerHeartbeatHandler.register(this.containerId);
+ }
+
+ protected void unregisterFromContainerListener() {
+ this.containerHeartbeatHandler.unregister(this.containerId);
+ }
+
+
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+
+public class AMContainerMap extends AbstractService implements
+ EventHandler<AMContainerEvent> {
+
+ private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
+
+ private final ContainerHeartbeatHandler chh;
+ private final TaskAttemptListener tal;
+ private final AppContext context;
+ private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+
+ public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
+ AppContext context) {
+ super("AMContainerMaps");
+ this.chh = chh;
+ this.tal = tal;
+ this.context = context;
+ this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
+ }
+
+ @Override
+ public void handle(AMContainerEvent event) {
+ AMContainer container = containerMap.get(event.getContainerId());
+ if(container != null) {
+ container.handle(event);
+ } else {
+ LOG.info("Event for unknown container: " + event.getContainerId());
+ }
+ }
+
+ public void addContainerIfNew(Container container) {
+ AMContainer amc = new AMContainerImpl(container, chh, tal, context);
+ containerMap.putIfAbsent(container.getId(), amc);
+ }
+
+ public AMContainer get(ContainerId containerId) {
+ return containerMap.get(containerId);
+ }
+
+ public Collection<AMContainer> values() {
+ return containerMap.values();
+ }
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+public enum AMContainerState {
+ ALLOCATED,
+ LAUNCHING,
+ IDLE,
+ RUNNING,
+ // indicates a NM stop request has been attempted. This request could fail, in
+ // which case an RM stop request needs to be sent.
+ STOP_REQUESTED,
+
+ // A stop request has been registered with YARN
+ STOPPING,
+ COMPLETED,
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.tez.common.TezTask;
+
+public class AMContainerTask {
+ private final boolean shouldDie;
+ private final TezTask tezTask;
+
+ public AMContainerTask(boolean shouldDie, TezTask tezTask) {
+ this.shouldDie = shouldDie;
+ this.tezTask = tezTask;
+ }
+
+ public boolean shouldDie() {
+ return this.shouldDie;
+ }
+
+ public TezTask getTask() {
+ return this.tezTask;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface AMNode extends EventHandler<AMNodeEvent> {
+
+ public NodeId getNodeId();
+ public AMNodeState getState();
+ public List<ContainerId> getContainers();
+
+ public boolean isUnhealthy();
+ public boolean isBlacklisted();
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
+
+ private final NodeId nodeId;
+
+ public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+ super(type);
+ this.nodeId = nodeId;
+ }
+
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMNodeEventContainerAllocated extends AMNodeEvent {
+
+ private final ContainerId containerId;
+
+ public AMNodeEventContainerAllocated(NodeId nodeId, ContainerId containerId) {
+ super(nodeId, AMNodeEventType.N_CONTAINER_ALLOCATED);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+public class AMNodeEventNodeCountUpdated extends AMNodeEvent {
+
+ private final int count;
+
+ public AMNodeEventNodeCountUpdated(int nodeCount) {
+ super(null, AMNodeEventType.N_NODE_COUNT_UPDATED);
+ this.count = nodeCount;
+ }
+
+ public int getNodeCount() {
+ return this.count;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.NodeReport;
+
+public class AMNodeEventStateChanged extends AMNodeEvent {
+
+ private NodeReport nodeReport;
+
+ public AMNodeEventStateChanged(NodeReport nodeReport) {
+ super(nodeReport.getNodeId(), nodeReport.getNodeHealthStatus()
+ .getIsNodeHealthy() ? AMNodeEventType.N_TURNED_HEALTHY
+ : AMNodeEventType.N_TURNED_UNHEALTHY);
+ this.nodeReport = nodeReport;
+ }
+
+ public NodeReport getNodeReport() {
+ return this.nodeReport;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMNodeEventTaskAttemptEnded extends AMNodeEvent {
+
+ private final boolean failed;
+ private final ContainerId containerId;
+ private final TezTaskAttemptID taskAttemptId;
+
+ public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId,
+ TezTaskAttemptID taskAttemptId, boolean failed) {
+ super(nodeId, AMNodeEventType.N_TA_ENDED);
+ this.failed = failed;
+ this.containerId = containerId;
+ this.taskAttemptId = taskAttemptId;
+ }
+
+ public boolean failed() {
+ return failed;
+ }
+
+ public boolean killed() {
+ return !failed;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ public TezTaskAttemptID getTaskAttemptId() {
+ return this.taskAttemptId;
+ }
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMNodeEventTaskAttemptSucceeded extends AMNodeEvent {
+
+ // TODO These two parameters really aren't required in this event.
+ private final ContainerId containerId;
+ private final TezTaskAttemptID taskAttemptId;
+
+ public AMNodeEventTaskAttemptSucceeded(NodeId nodeId,
+ ContainerId containerId, TezTaskAttemptID taskAttemptId) {
+ super(nodeId, AMNodeEventType.N_TA_SUCCEEDED);
+ this.containerId = containerId;
+ this.taskAttemptId = taskAttemptId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ public TezTaskAttemptID getTaskAttemptId() {
+ return this.taskAttemptId;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.node;
+
+public enum AMNodeEventType {
+ //Producer: Scheduler
+ N_CONTAINER_ALLOCATED,
+
+ //Producer: TaskAttempt
+ N_TA_SUCCEEDED,
+ N_TA_ENDED,
+
+ //Producer: RMCommunicator
+ N_TURNED_UNHEALTHY,
+ N_TURNED_HEALTHY,
+ N_NODE_COUNT_UPDATED, // for blacklisting.
+
+ //Producer: AMNodeManager
+ N_IGNORE_BLACKLISTING_ENABLED,
+ N_IGNORE_BLACKLISTING_DISABLED,
+
+ // Producer: AMNode - Will not reach AMNodeImpl. Used to compute whether
+ // blacklisting should be ignored.
+ N_NODE_WAS_BLACKLISTED
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
------------------------------------------------------------------------------
svn:eol-style = native