You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [26/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+public class AMContainerEventAssignTA extends AMContainerEvent {
+
+ private final TaskAttemptId attemptId;
+ // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
+ private final MRTaskContext remoteTaskContext;
+
+ public AMContainerEventAssignTA(ContainerId containerId,
+ TaskAttemptId attemptId, MRTaskContext remoteTaskContext) {
+ super(containerId, AMContainerEventType.C_ASSIGN_TA);
+ this.attemptId = attemptId;
+ this.remoteTaskContext = remoteTaskContext;
+ }
+
+ public MRTaskContext getRemoteTaskContext() {
+ return this.remoteTaskContext;
+ }
+
+ public TaskAttemptId getTaskAttemptId() {
+ return this.attemptId;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class AMContainerEventCompleted extends AMContainerEvent {
+
+ private final ContainerStatus containerStatus;
+
+ public AMContainerEventCompleted(ContainerStatus containerStatus) {
+ super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
+ this.containerStatus = containerStatus;
+ }
+
+ public ContainerStatus getContainerStatus() {
+ return this.containerStatus;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunchFailed extends AMContainerEvent {
+
+ private final String message;
+
+ public AMContainerEventLaunchFailed(ContainerId containerId,
+ String message) {
+ super(containerId, AMContainerEventType.C_LAUNCH_FAILED);
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return this.message;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunchRequest extends AMContainerEvent {
+
+ private final JobId jobId;
+ private final TaskType taskTypeForContainer;
+ private final Token<JobTokenIdentifier> jobToken;
+ private final Credentials credentials;
+ private final boolean shouldProfile;
+ private final JobConf jobConf;
+
+ public AMContainerEventLaunchRequest(ContainerId containerId, JobId jobId,
+ TaskType taskType, Token<JobTokenIdentifier> jobToken,
+ Credentials credentials, boolean shouldProfile, JobConf jobConf) {
+ super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
+ this.jobId = jobId;
+ this.taskTypeForContainer = taskType;
+ this.jobToken = jobToken;
+ this.credentials = credentials;
+ this.shouldProfile = shouldProfile;
+ this.jobConf = jobConf;
+ }
+
+ public JobId getJobId() {
+ return this.jobId;
+ }
+
+ public TaskType getTaskTypeForContainer() {
+ return this.taskTypeForContainer;
+ }
+
+ public Token<JobTokenIdentifier> getJobToken() {
+ return this.jobToken;
+ }
+
+ public Credentials getCredentials() {
+ return this.credentials;
+ }
+
+ public boolean shouldProfile() {
+ return this.shouldProfile;
+ }
+
+ public JobConf getJobConf() {
+ return this.jobConf;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunched extends AMContainerEvent {
+
+ private final int shufflePort;
+
+ public AMContainerEventLaunched(ContainerId containerId, int shufflePort) {
+ super(containerId, AMContainerEventType.C_LAUNCHED);
+ this.shufflePort = shufflePort;
+ }
+
+ public int getShufflePort() {
+ return this.shufflePort;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapreduce.v2.common.DiagnosableEvent;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventNodeFailed extends AMContainerEvent implements
+ DiagnosableEvent {
+
+ private final String message;
+
+ public AMContainerEventNodeFailed(ContainerId containerId, String message) {
+ super(containerId, AMContainerEventType.C_NODE_FAILED);
+ this.message = message;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return message;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventStopFailed extends AMContainerEvent {
+
+ private final String message;
+
+ public AMContainerEventStopFailed(ContainerId containerId, String message) {
+ super(containerId, AMContainerEventType.C_NM_STOP_FAILED);
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return this.message;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventTASucceeded extends AMContainerEvent {
+
+ private final TaskAttemptId attemptId;
+
+ public AMContainerEventTASucceeded(ContainerId containerId,
+ TaskAttemptId attemptId) {
+ super(containerId, AMContainerEventType.C_TA_SUCCEEDED);
+ this.attemptId = attemptId;
+ }
+
+ public TaskAttemptId getTaskAttemptId() {
+ return this.attemptId;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+public enum AMContainerEventType {
+
+ //Producer: Scheduler
+ C_LAUNCH_REQUEST,
+ C_ASSIGN_TA,
+
+ //Producer: NMCommunicator
+ C_LAUNCHED,
+ C_LAUNCH_FAILED,
+
+ //Producer: TAL: PULL_TA is a sync call.
+ C_PULL_TA,
+
+ //Producer: Scheduler via TA
+ C_TA_SUCCEEDED, // maybe change this to C_TA_FINISHED with a status.
+
+ //Producer: RMCommunicator
+ C_COMPLETED,
+
+ //Producer: RMCommunicator, AMNode
+ C_NODE_FAILED,
+
+ // TODO ZZZ CREUSE: Consider introducing a new event C_NODE_BLACKLISTED -> container can take a call on what to do if this event comes in.
+
+ //Producer: TA-> Scheduler -> Container (in case of failure etc)
+ // Scheduler -> Container (in case of pre-emption etc)
+ // Node -> Container (in case of Node blacklisted etc)
+ C_STOP_REQUEST,
+
+ //Producer: NMCommunicator
+ C_NM_STOP_FAILED,
+ C_NM_STOP_SENT,
+
+ //Producer: ContainerHeartbeatHandler
+ C_TIMED_OUT,
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,273 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceChildJVM2;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AMContainerHelpers {
+
+ private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
+
+ private static Object commonContainerSpecLock = new Object();
+ private static ContainerLaunchContext commonContainerSpec = null;
+ private static final Object classpathLock = new Object();
+ private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+ private static String initialClasspath = null;
+
+ /**
+ * Create a {@link LocalResource} record with all the given parameters.
+ */
+ private static LocalResource createLocalResource(FileSystem fc, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility)
+ throws IOException {
+ FileStatus fstat = fc.getFileStatus(file);
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+ .getPath()));
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+ resourceSize, resourceModificationTime);
+ }
+
+ /**
+ * Lock this on initialClasspath so that there is only one fork in the AM for
+ * getting the initial class-path. TODO: We already construct a parent CLC and
+ * use it for all the containers, so this should go away once the
+ * mr-generated-classpath stuff is gone.
+ */
+ private static String getInitialClasspath(Configuration conf)
+ throws IOException {
+ synchronized (classpathLock) {
+ if (initialClasspathFlag.get()) {
+ return initialClasspath;
+ }
+ Map<String, String> env = new HashMap<String, String>();
+ MRApps.setClasspath(env, conf);
+ initialClasspath = env.get(Environment.CLASSPATH.name());
+ initialClasspathFlag.set(true);
+ return initialClasspath;
+ }
+ }
+
+ /**
+ * Create the common {@link ContainerLaunchContext} for all attempts.
+ *
+ * @param applicationACLs
+ */
+ private static ContainerLaunchContext createCommonContainerLaunchContext(
+ Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+ Token<JobTokenIdentifier> jobToken,
+ final org.apache.hadoop.mapred.JobID oldJobId, Credentials credentials) {
+
+ // Application resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ // Application environment
+ Map<String, String> environment = new HashMap<String, String>();
+
+ // Service data
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+ // Tokens
+ ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
+ try {
+ FileSystem remoteFS = FileSystem.get(conf);
+
+ // //////////// Set up JobJar to be localized properly on the remote NM.
+ String jobJar = conf.get(MRJobConfig.JAR);
+ if (jobJar != null) {
+ Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ localResources.put(
+ MRJobConfig.JOB_JAR,
+ createLocalResource(remoteFS, remoteJobJar, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION));
+ LOG.info("The job-jar file on the remote FS is "
+ + remoteJobJar.toUri().toASCIIString());
+ } else {
+ // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+ // mapreduce jar itself which is already on the classpath.
+ LOG.info("Job jar is not present. "
+ + "Not adding any jar to the list of resources.");
+ }
+ // //////////// End of JobJar setup
+
+ // //////////// Set up JobConf to be localized properly on the remote NM.
+ Path path = MRApps.getStagingAreaDir(conf, UserGroupInformation
+ .getCurrentUser().getShortUserName());
+ Path remoteJobSubmitDir = new Path(path, oldJobId.toString());
+ Path remoteJobConfPath = new Path(remoteJobSubmitDir,
+ MRJobConfig.JOB_CONF_FILE);
+ localResources.put(
+ MRJobConfig.JOB_CONF_FILE,
+ createLocalResource(remoteFS, remoteJobConfPath,
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ LOG.info("The job-conf file on the remote FS is "
+ + remoteJobConfPath.toUri().toASCIIString());
+ // //////////// End of JobConf setup
+
+ // Setup DistributedCache
+ MRApps.setupDistributedCache(conf, localResources);
+
+ // Setup up task credentials buffer
+ Credentials taskCredentials = new Credentials();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+ + credentials.numberOfSecretKeys()
+ + " secret keys for NM use for launching container");
+ taskCredentials.addAll(credentials);
+ }
+
+ // LocalStorageToken is needed irrespective of whether security is enabled
+ // or not.
+ TokenCache.setJobToken(jobToken, taskCredentials);
+
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ LOG.info("Size of containertokens_dob is "
+ + taskCredentials.numberOfTokens());
+ taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+ taskCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+ containerTokens_dob.getLength());
+
+ // Add shuffle token
+ LOG.info("Putting shuffle token in serviceData");
+ serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+ ShuffleHandler.serializeServiceData(jobToken));
+
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ getInitialClasspath(conf));
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ // Shell
+ environment.put(Environment.SHELL.name(), conf.get(
+ MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
+
+ // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+ Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
+ Environment.PWD.$());
+
+ // Add the env variables passed by the admin
+ Apps.setEnvFromInputString(environment, conf.get(
+ MRJobConfig.MAPRED_ADMIN_USER_ENV,
+ MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
+
+ // Construct the actual Container
+ // The null fields are per-container and will be constructed for each
+ // container separately.
+ ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+ null, conf.get(MRJobConfig.USER_NAME), null, localResources,
+ environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
+
+ return container;
+ }
+
+ @VisibleForTesting
+ public static ContainerLaunchContext createContainerLaunchContext(
+ Map<ApplicationAccessType, String> applicationACLs,
+ ContainerId containerID, JobConf jobConf, TaskType taskType,
+ Token<JobTokenIdentifier> jobToken,
+ final org.apache.hadoop.mapred.JobID oldJobId,
+ Resource assignedCapability, ContainerId containerId,
+ TaskAttemptListener taskAttemptListener, Credentials credentials,
+ boolean shouldProfile) {
+
+ synchronized (commonContainerSpecLock) {
+ if (commonContainerSpec == null) {
+ commonContainerSpec = createCommonContainerLaunchContext(
+ applicationACLs, jobConf, jobToken, oldJobId, credentials);
+ }
+ }
+
+ // Fill in the fields needed per-container that are missing in the common
+ // spec.
+
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerSpec.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+ MapReduceChildJVM2.setVMEnv(myEnv, jobConf, taskType);
+
+ // Set up the launch command
+ List<String> commands = MapReduceChildJVM2.getVMCommand(
+ taskAttemptListener.getAddress(), jobConf, taskType, containerId,
+ oldJobId, shouldProfile);
+
+ // Duplicate the ByteBuffers for access by multiple containers.
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData()
+ .entrySet()) {
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+ }
+
+ // Construct the actual Container
+ ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+ containerID, commonContainerSpec.getUser(), assignedCapability,
+ commonContainerSpec.getLocalResources(), myEnv, commands,
+ myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+ applicationACLs);
+
+ return container;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,915 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.common.DiagnosableEvent;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+@SuppressWarnings("rawtypes")
+public class AMContainerImpl implements AMContainer {
+
+ private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
+
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+ private final ContainerId containerId;
+ // Container to be used for getters on capability, locality etc.
+ private final Container container;
+ private final AppContext appContext;
+ private final ContainerHeartbeatHandler containerHeartbeatHandler;
+ private final TaskAttemptListener taskAttemptListener;
+ protected final EventHandler eventHandler;
+
+ private final List<TaskAttemptId> completedAttempts = new LinkedList<TaskAttemptId>();
+
+ // TODO Maybe this should be pulled from the TaskAttempt.s
+ private final Map<TaskAttemptId, MRTaskContext> remoteTaskMap =
+ new HashMap<TaskAttemptId, MRTaskContext>();
+
+ // TODO ?? Convert to list and hash.
+
+ private int shufflePort;
+ private long idleTimeBetweenTasks = 0;
+ private long lastTaskFinishTime;
+
+ // An assign can happen even during wind down. e.g. NodeFailure caused the
+ // wind down, and an allocation was pending in the AMScheduler. This could
+ // be modelled as a separate state.
+ private boolean nodeFailed = false;
+ private String nodeFailedMessage;
+
+ private TaskAttemptId pendingAttempt;
+ private TaskAttemptId runningAttempt;
+ private List<TaskAttemptId> failedAssignments;
+ private TaskAttemptId pullAttempt;
+
+ private boolean inError = false;
+
+ private ContainerLaunchContext clc;
+
+ // TODO Consider registering with the TAL, instead of the TAL pulling.
+ // Possibly after splitting TAL and ContainerListener.
+
+ // TODO What should be done with pendingAttempts. Nullify when handled ?
+ // Add them to failed ta list ? Some historic information should be maintained.
+
+ // TODO Create a generic ERROR state. Container tries informing relevant components in this case.
+
+ private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
+ private static final StateMachineFactory
+ <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>
+ stateMachineFactory =
+ new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
+ AMContainerState.ALLOCATED)
+
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
+
+ .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
+ // TODO CREUSE : Maybe, consider sending back an attempt if the container asks for one in this state. Waiting for a LAUNCHED event from the NMComm may delay the task allocation.
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorAtLaunchingTransition())
+
+ .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
+
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
+ .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
+
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, new NMStopRequestFailedTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
+
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
+
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+
+ .installTopology();
+
+ // Note: Containers will not reach their final state if the RM link is broken,
+ // AM shutdown should not wait for this.
+
+ // Attempting to use a container based purely on reosurces required, etc needs
+ // additional change - JvmID, YarnChild, etc depend on TaskType.
+ public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
+ TaskAttemptListener tal, AppContext appContext) {
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ this.container = container;
+ this.containerId = container.getId();
+ this.eventHandler = appContext.getEventHandler();
+ this.appContext = appContext;
+ this.containerHeartbeatHandler = chh;
+ this.taskAttemptListener = tal;
+ this.failedAssignments = new LinkedList<TaskAttemptId>();
+
+ this.stateMachine = stateMachineFactory.make(this);
+ }
+
+ @Override
+ public AMContainerState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ @Override
+ public Container getContainer() {
+ return this.container;
+ }
+
+ @Override
+ public List<TaskAttemptId> getCompletedTaskAttempts() {
+ readLock.lock();
+ try {
+ return new ArrayList<TaskAttemptId>(this.completedAttempts);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<TaskAttemptId> getAllTaskAttempts() {
+ readLock.lock();
+ try {
+ List<TaskAttemptId> allTasks =
+ new ArrayList<TaskAttemptId>(this.completedAttempts);
+ if (this.runningAttempt != null) {
+ allTasks.add(runningAttempt);
+ }
+ if (this.pendingAttempt != null) {
+ allTasks.add(pendingAttempt);
+ }
+ return allTasks;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<TaskAttemptId> getQueuedTaskAttempts() {
+ readLock.lock();
+ try {
+ return Collections.singletonList(this.pendingAttempt);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskAttemptId getRunningTaskAttempt() {
+ readLock.lock();
+ try {
+ return this.runningAttempt;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public int getShufflePort() {
+ readLock.lock();
+ try {
+ return this.shufflePort;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void handle(AMContainerEvent event) {
+ this.writeLock.lock();
+ LOG.info("DEBUG: Processing ContainerEvent: " + event.getContainerId() + " of type "
+ + event.getType() + " while in state: " + getState());
+ try {
+ final AMContainerState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle event " + event.getType()
+ + " at current state " + oldState + " for ContainerId "
+ + this.containerId, e);
+ inError = true;
+ // TODO Can't set state to COMPLETED. Add a default error state.
+ }
+ if (oldState != getState()) {
+ LOG.info("AMContainer " + this.containerId + " transitioned from "
+ + oldState + " to " + getState());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ this.eventHandler.handle(event);
+ }
+
+ // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks
+ // for a TaskAttempt.
+ public MRTaskContext pullTaskContext() {
+ this.writeLock.lock();
+ try {
+ this.handle(
+ new AMContainerEvent(containerId, AMContainerEventType.C_PULL_TA));
+ return pullAttempt == null ? null : remoteTaskMap.remove(pullAttempt);
+ } finally {
+ this.pullAttempt = null;
+ this.writeLock.unlock();
+ }
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Start of Transition Classes //
+ //////////////////////////////////////////////////////////////////////////////
+
+ protected static class LaunchRequestTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
+
+ container.clc = AMContainerHelpers.createContainerLaunchContext(
+ container.appContext.getApplicationACLs(),
+ container.getContainerId(), event.getJobConf(),
+ event.getTaskTypeForContainer(), event.getJobToken(),
+ TypeConverter.fromYarn(event.getJobId()),
+ container.getContainer().getResource(), container.containerId,
+ container.taskAttemptListener, event.getCredentials(),
+ event.shouldProfile());
+
+ container.registerWithTAListener();
+ container.sendStartRequestToNM();
+ LOG.info("Sending Launch Request for Container with id: " +
+ container.clc.getContainerId());
+ // Forget about the clc to save resources. At some point, part of the clc
+ // info may need to be exposed to the scheduler to figure out whether a
+ // container can be used for a specific TaskAttempt.
+ container.clc = null;
+ }
+ }
+
+ protected static class AssignTaskAttemptAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ container.inError = true;
+ container.maybeSendNodeFailureForFailedAssignment(event
+ .getTaskAttemptId());
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ "AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
+ container.getContainerId());
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
+ " for ContainerId: " + container.getContainerId() +
+ " while in state: " + container.getState());
+ }
+ }
+
+ protected static class CompletedAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.sendCompletedToScheduler();
+ }
+ }
+
+ protected static class StopRequestAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ }
+ }
+
+ protected static class NodeFailedAtAllocatedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.nodeFailed = true;
+ if (cEvent instanceof DiagnosableEvent) {
+ container.nodeFailedMessage =
+ ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+ }
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorTransition extends ErrorBaseTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.sendCompletedToScheduler();
+ container.deAllocate();
+ LOG.info(
+ "Unexpected event type: " + cEvent.getType() + " while in state: " +
+ container.getState() + ". Event: " + cEvent);
+
+ }
+ }
+
+ protected static class AssignTaskAttemptTransition implements
+ MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+ @Override
+ public AMContainerState transition(
+ AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ if (container.pendingAttempt != null) {
+ // This may include a couple of additional (harmless) unregister calls
+ // to the taskAttemptListener and containerHeartbeatHandler - in case
+ // of assign at any state prior to IDLE.
+ container.handleExtraTAAssign(event, container.pendingAttempt);
+ // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
+ // NMCommunicator should be able to handle this. The STOP_REQUEST would
+ // only go out after the START_REQUEST.
+ return AMContainerState.STOP_REQUESTED;
+ }
+ container.pendingAttempt = event.getTaskAttemptId();
+ LOG.info("DEBUG: AssignTA: attempt: " + event.getRemoteTaskContext());
+ container.remoteTaskMap
+ .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
+ return container.getState();
+ }
+ }
+
+ protected static class LaunchedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventLaunched event = (AMContainerEventLaunched) cEvent;
+ container.shufflePort = event.getShufflePort();
+ container.registerWithContainerListener();
+ }
+ }
+
+ protected static class LaunchFailedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
+ container.sendTerminatingToTA(container.pendingAttempt,
+ event.getMessage());
+ }
+ container.unregisterFromTAListener();
+ container.deAllocate();
+ }
+ }
+
+ protected static class CompletedAtLaunchingTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ LOG.info(
+ "Cotnainer with id: " + container.getContainerId() + " Completed." +
+ " Previous state was: " + container.getState());
+ if (container.pendingAttempt != null) {
+ String errorMessage = getMessage(container, cEvent);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+ errorMessage);
+ LOG.warn(errorMessage);
+ }
+ container.unregisterFromTAListener();
+ container.sendCompletedToScheduler();
+ }
+
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container" + container.getContainerId() +
+ " COMPLETED while trying to launch";
+ }
+ }
+
+ protected static class StopRequestAtLaunchingTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ container.sendTerminatingToTA(container.pendingAttempt,
+ getMessage(container, cEvent));
+ }
+ container.unregisterFromTAListener();
+ container.sendStopRequestToNM();
+ }
+
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container " + container.getContainerId() +
+ " received a STOP_REQUEST";
+ }
+ }
+
+ protected static class NodeFailedBaseTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+ container.nodeFailed = true;
+ String errorMessage = null;
+ if (cEvent instanceof DiagnosableEvent) {
+ errorMessage = ((DiagnosableEvent)cEvent).getDiagnosticInfo();
+ container.nodeFailedMessage = errorMessage;
+ }
+
+ for (TaskAttemptId taId : container.failedAssignments) {
+ container.sendNodeFailureToTA(taId, errorMessage);
+ }
+ for (TaskAttemptId taId : container.completedAttempts) {
+ container.sendNodeFailureToTA(taId, errorMessage);
+ }
+
+ if (container.pendingAttempt != null) {
+ container.sendNodeFailureToTA(container.pendingAttempt,
+ errorMessage);
+ container.sendTerminatingToTA(container.pendingAttempt, "Node failure");
+ }
+ if (container.runningAttempt != null) {
+ container.sendNodeFailureToTA(container.runningAttempt,
+ errorMessage);
+ container.sendTerminatingToTA(container.runningAttempt, "Node failure");
+ }
+ }
+ }
+
+ protected static class NodeFailedAtLaunchingTransition
+ extends NodeFailedBaseTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromTAListener();
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorAtLaunchingTransition
+ extends ErrorTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ if (container.pendingAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+ "Container " + container.getContainerId() +
+ " hit an invalid transition - " + cEvent.getType() + " at " +
+ container.getState());
+ }
+ container.unregisterFromTAListener();
+ }
+ }
+
+ protected static class AssignTaskAttemptAtIdleTransition
+ extends AssignTaskAttemptTransition {
+ @Override
+ public AMContainerState transition(
+ AMContainerImpl container, AMContainerEvent cEvent) {
+ LOG.info("DEBUG: AssignTAAtIdle: attempt: " +
+ ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+ return super.transition(container, cEvent);
+ }
+ }
+
+ protected static class PullTAAtIdleTransition implements
+ MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+ @Override
+ public AMContainerState transition(
+ AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ // This will be invoked as part of the PULL_REQUEST - so pullAttempt pullAttempt
+ // should ideally only end up being populated during the duration of this call,
+ // which is in a write lock. pullRequest() should move this to the running state.
+ container.pullAttempt = container.pendingAttempt;
+ container.runningAttempt = container.pendingAttempt;
+ container.pendingAttempt = null;
+ if (container.lastTaskFinishTime != 0) {
+ long idleTimeDiff =
+ System.currentTimeMillis() - container.lastTaskFinishTime;
+ LOG.info("DEBUG: Computing idle time for container: " +
+ container.getContainerId() + ", lastFinishTime: " +
+ container.lastTaskFinishTime + ", Incremented by: " +
+ idleTimeDiff);
+ container.idleTimeBetweenTasks +=
+ System.currentTimeMillis() - container.lastTaskFinishTime;
+ }
+ LOG.info("Assigned taskAttempt + [" + container.runningAttempt +
+ "] to container: [" + container.getContainerId() + "]");
+ return AMContainerState.RUNNING;
+ } else {
+ return AMContainerState.IDLE;
+ }
+ }
+ }
+
+ protected static class CompletedAtIdleTransition
+ extends CompletedAtLaunchingTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromContainerListener();
+ }
+
+ @Override
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container " + container.getContainerId() + "COMPLETED";
+ }
+ }
+
+ protected static class StopRequestAtIdleTransition
+ extends StopRequestAtLaunchingTransition {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ LOG.info("DEBUG: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
+ container.unregisterFromContainerListener();
+ }
+ }
+
+ protected static class TimedOutAtIdleTransition
+ extends StopRequestAtIdleTransition {
+
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container " + container.getContainerId() +
+ " timed out";
+ }
+ }
+
+ protected static class NodeFailedAtIdleTransition
+ extends NodeFailedAtLaunchingTransition {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromContainerListener();
+ }
+ }
+
+ protected static class ErrorAtIdleTransition
+ extends ErrorAtLaunchingTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterFromContainerListener();
+ }
+ }
+
+ protected static class AssignTaskAttemptAtRunningTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.handleExtraTAAssign(event, container.runningAttempt);
+ }
+ }
+
+ protected static class TASucceededAtRunningTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.lastTaskFinishTime = System.currentTimeMillis();
+ container.completedAttempts.add(container.runningAttempt);
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.runningAttempt = null;
+ }
+ }
+
+ protected static class CompletedAtRunningTransition
+ extends CompletedAtIdleTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.sendTerminatedToTaskAttempt(container.runningAttempt,
+ getMessage(container, cEvent));
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ super.transition(container, cEvent);
+ }
+ }
+
+ protected static class StopRequestAtRunningTransition
+ extends StopRequestAtIdleTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.sendTerminatingToTA(container.runningAttempt,
+ " Container" + container.getContainerId() +
+ " received a STOP_REQUEST");
+ super.transition(container, cEvent);
+ }
+ }
+
+ protected static class TimedOutAtRunningTransition
+ extends StopRequestAtRunningTransition {
+ @Override
+ public String getMessage(
+ AMContainerImpl container, AMContainerEvent event) {
+ return "Container " + container.getContainerId() +
+ " timed out";
+ }
+ }
+
+ protected static class NodeFailedAtRunningTransition
+ extends NodeFailedAtIdleTransition {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ }
+ }
+
+ protected static class ErrorAtRunningTransition
+ extends ErrorAtIdleTransition {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.unregisterAttemptFromListener(container.runningAttempt);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt,
+ "Container " + container.getContainerId() +
+ " hit an invalid transition - " + cEvent.getType() + " at " +
+ container.getState());
+ }
+ }
+
+ protected static class AssignTAAtWindDownTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ container.inError = true;
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId() +
+ " cannot be allocated to container: " + container.getContainerId() +
+ " in " + container.getState() + " state";
+ container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+ container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
+ }
+ }
+
+ protected static class CompletedAtWindDownTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+ }
+ if (container.runningAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+ }
+ for (TaskAttemptId taId : container.failedAssignments) {
+ container.sendTerminatedToTaskAttempt(taId, null);
+ }
+ container.sendCompletedToScheduler();
+ }
+ }
+
+ protected static class NMStopRequestFailedTransition
+ implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.deAllocate();
+ }
+ }
+
+ protected static class NodeFailedAtNMStopRequestedTransition
+ extends NodeFailedBaseTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorAtNMStopRequestedTransition
+ extends ErrorAtStoppingTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ container.deAllocate();
+ }
+ }
+
+ protected static class ErrorAtStoppingTransition
+ extends ErrorBaseTransition {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ if (container.pendingAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+ }
+ if (container.runningAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+ }
+ for (TaskAttemptId taId : container.failedAssignments) {
+ container.sendTerminatedToTaskAttempt(taId, null);
+ }
+ container.sendCompletedToScheduler();
+ }
+ }
+
+ protected static class ErrorBaseTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.inError = true;
+ }
+ }
+
+ protected static class AssignTAAtCompletedTransition implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ // TODO CREUSE CRITICAL: This is completely incorrect. COMPLETED comes
+ // from RMComm directly to the container. Meanwhile, the scheduler may
+ // think the container is still around and assign a task to it. The task
+ // ends up getting a CONTAINER_KILLED message. Task could handle this by
+ // asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it.
+ container.inError = true;
+ AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ + " cannot be allocated to container: " + container.getContainerId()
+ + " in COMPLETED state";
+ container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
+ }
+ }
+
+
+ private void handleExtraTAAssign(
+ AMContainerEventAssignTA event, TaskAttemptId currentTaId) {
+ this.inError = true;
+ String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+ "taskAttempt allocations to: " + this.getContainerId() +
+ ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
+ ". Current state: " + this.getState();
+ this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+ this.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ this.sendTerminatingToTA(currentTaId, errorMessage);
+ this.registerFailedTAAssignment(event.getTaskAttemptId());
+ LOG.warn(errorMessage);
+ this.sendStopRequestToNM();
+ this.unregisterFromTAListener();
+ this.unregisterFromContainerListener();
+ }
+
+
+ protected void registerFailedTAAssignment(TaskAttemptId taId) {
+ failedAssignments.add(taId);
+ }
+
+ protected void deAllocate() {
+ sendEvent(new RMCommunicatorContainerDeAllocateRequestEvent(containerId));
+ }
+
+ protected void sendCompletedToScheduler() {
+ sendEvent(new AMSchedulerEventContainerCompleted(containerId));
+ }
+
+ protected void sendTerminatedToTaskAttempt(
+ TaskAttemptId taId, String message) {
+ sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+ }
+
+ protected void sendTerminatingToTA(TaskAttemptId taId, String message) {
+ sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+ }
+
+ protected void maybeSendNodeFailureForFailedAssignment(TaskAttemptId taId) {
+ if (this.nodeFailed) {
+ this.sendNodeFailureToTA(taId, nodeFailedMessage);
+ }
+ }
+
+ protected void sendNodeFailureToTA(TaskAttemptId taId, String message) {
+ sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+ }
+
+ protected void sendStartRequestToNM() {
+ sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
+ }
+
+ protected void sendStopRequestToNM() {
+ sendEvent(new NMCommunicatorStopRequestEvent(containerId,
+ container.getNodeId(), container.getContainerToken()));
+ }
+
+ protected void unregisterAttemptFromListener(TaskAttemptId attemptId) {
+ taskAttemptListener.unregisterTaskAttempt(attemptId);
+ }
+
+ protected void registerWithTAListener() {
+ taskAttemptListener.registerRunningContainer(containerId);
+ }
+
+ protected void unregisterFromTAListener() {
+ this.taskAttemptListener.unregisterRunningContainer(containerId);
+ }
+
+
+ protected void registerWithContainerListener() {
+ this.containerHeartbeatHandler.register(this.containerId);
+ }
+
+ protected void unregisterFromContainerListener() {
+ this.containerHeartbeatHandler.unregister(this.containerId);
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class AMContainerMap extends AbstractService implements
+ EventHandler<AMContainerEvent> {
+
+ private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
+
+ private final ContainerHeartbeatHandler chh;
+ private final TaskAttemptListener tal;
+ private final AppContext context;
+ private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+
+ public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
+ AppContext context) {
+ super("AMContainerMaps");
+ this.chh = chh;
+ this.tal = tal;
+ this.context = context;
+ this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
+ }
+
+ @Override
+ public void handle(AMContainerEvent event) {
+ ContainerId containerId = event.getContainerId();
+ containerMap.get(containerId).handle(event);
+ }
+
+ public void addContainerIfNew(Container container) {
+ AMContainer amc = new AMContainerImpl(container, chh, tal, context);
+ containerMap.putIfAbsent(container.getId(), amc);
+ }
+
+ public AMContainer get(ContainerId containerId) {
+ return containerMap.get(containerId);
+ }
+
+ public Collection<AMContainer> values() {
+ return containerMap.values();
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+public enum AMContainerState {
+ ALLOCATED,
+ LAUNCHING,
+ IDLE,
+ RUNNING,
+ // indicates a NM stop request has been attempted. This request could fail, in
+ // which case an RM stop request needs to be sent.
+ STOP_REQUESTED,
+
+ // A stop request has been registered with YARN
+ STOPPING,
+ COMPLETED,
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface AMNode extends EventHandler<AMNodeEvent> {
+
+ public NodeId getNodeId();
+ public AMNodeState getState();
+ public List<ContainerId> getContainers();
+
+ public boolean isUnhealthy();
+ public boolean isBlacklisted();
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
+
+ private final NodeId nodeId;
+
+ public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+ super(type);
+ this.nodeId = nodeId;
+ }
+
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMNodeEventContainerAllocated extends AMNodeEvent {
+
+ private final ContainerId containerId;
+
+ public AMNodeEventContainerAllocated(NodeId nodeId, ContainerId containerId) {
+ super(nodeId, AMNodeEventType.N_CONTAINER_ALLOCATED);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+public class AMNodeEventNodeCountUpdated extends AMNodeEvent {
+
+ private final int count;
+
+ public AMNodeEventNodeCountUpdated(int nodeCount) {
+ super(null, AMNodeEventType.N_NODE_COUNT_UPDATED);
+ this.count = nodeCount;
+ }
+
+ public int getNodeCount() {
+ return this.count;
+ }
+}