You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/12/09 15:51:59 UTC
[33/33] hadoop git commit: YARN-5292. NM Container lifecycle and
state transitions to support for PAUSED container state. (Hitesh Sharma via
asuresh)
YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8752f537
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8752f537
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8752f537
Branch: refs/heads/YARN-5972
Commit: 8752f5376d47c715780cc49a0f7bb18a1050e472
Parents: 80b8023
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 9 07:51:03 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Dec 9 07:51:03 2016 -0800
----------------------------------------------------------------------
.../hadoop/yarn/api/records/ContainerState.java | 7 +-
.../src/main/proto/yarn_protos.proto | 1 +
.../server/nodemanager/ContainerExecutor.java | 22 +++
.../container/ContainerEventType.java | 6 +-
.../container/ContainerImpl.java | 170 ++++++++++++++++++-
.../container/ContainerPauseEvent.java | 40 +++++
.../container/ContainerResumeEvent.java | 39 +++++
.../container/ContainerState.java | 3 +-
.../launcher/ContainerLaunch.java | 90 +++++++++-
.../launcher/ContainersLauncher.java | 32 ++++
.../launcher/ContainersLauncherEventType.java | 3 +
.../scheduler/ContainerSchedulerEventType.java | 1 +
.../container/TestContainer.java | 51 ++++++
13 files changed, 454 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 4efd8c1..2d83cfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -32,10 +32,13 @@ public enum ContainerState {
/** Running container */
RUNNING,
-
+
/** Completed container */
COMPLETE,
/** Scheduled (awaiting resources) at the NM. */
- SCHEDULED
+ SCHEDULED,
+
+ /** Paused at the NM. */
+ PAUSED
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 5a70298..9cd6348 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -83,6 +83,7 @@ enum ContainerStateProto {
C_RUNNING = 2;
C_COMPLETE = 3;
C_SCHEDULED = 4;
+ C_PAUSED = 5;
}
message ContainerProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index f880506..ffa125a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -684,6 +684,28 @@ public abstract class ContainerExecutor implements Configurable {
}
/**
+ * Pause the container. The default implementation is to raise a kill event.
+ * Specific executor implementations can override this behavior.
+ * @param container
+ * the Container
+ */
+ public void pauseContainer(Container container) {
+ LOG.warn(container.getContainerId() + " doesn't support pausing.");
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Resume the container from pause state. The default implementation ignores
+ * this event. Specific implementations can override this behavior.
+ * @param container
+ * the Container
+ */
+ public void resumeContainer(Container container) {
+ LOG.warn(container.getContainerId() + " doesn't support resume.");
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Get the process-identifier for the container.
*
* @param containerID the container ID
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index afea0e6..1475435 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -27,6 +27,8 @@ public enum ContainerEventType {
CONTAINER_DONE,
REINITIALIZE_CONTAINER,
ROLLBACK_REINIT,
+ PAUSE_CONTAINER,
+ RESUME_CONTAINER,
// DownloadManager
CONTAINER_INITED,
@@ -38,5 +40,7 @@ public enum ContainerEventType {
CONTAINER_LAUNCHED,
CONTAINER_EXITED_WITH_SUCCESS,
CONTAINER_EXITED_WITH_FAILURE,
- CONTAINER_KILLED_ON_REQUEST
+ CONTAINER_KILLED_ON_REQUEST,
+ CONTAINER_PAUSED,
+ CONTAINER_RESUMED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 4a6be32..ec7ee49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -298,6 +298,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
+ .addTransition(ContainerState.NEW, ContainerState.DONE,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
@@ -313,6 +315,8 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
+ .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From LOCALIZATION_FAILED State
.addTransition(ContainerState.LOCALIZATION_FAILED,
@@ -326,7 +330,8 @@ public class ContainerImpl implements Container {
// container not launched so kill is a no-op
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
// container cleanup triggers a release of all resources
// regardless of whether they were localized or not
// LocalizedResource handles release event in all states
@@ -382,6 +387,76 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
+ ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+
+ // From PAUSING State
+ .addTransition(ContainerState.PAUSING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
+ ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
+ // In case something goes wrong then container will exit from the
+ // PAUSING state
+ .addTransition(ContainerState.PAUSING,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
+ .addTransition(ContainerState.PAUSING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ new KilledExternallyTransition())
+
+ // From PAUSED State
+ .addTransition(ContainerState.PAUSED, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+ ContainerEventType.PAUSE_CONTAINER)
+ .addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
+ ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
+ // In case something goes wrong then container will exit from the
+ // PAUSED state
+ .addTransition(ContainerState.PAUSED,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ new KilledExternallyTransition())
+ .addTransition(ContainerState.PAUSED,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition(true))
+
+ // From RESUMING State
+ .addTransition(ContainerState.RESUMING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
+ ContainerEventType.CONTAINER_RESUMED)
+ .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ // In case something goes wrong then container will exit from the
+ // RESUMING state
+ .addTransition(ContainerState.RESUMING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.RESUMING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ new KilledExternallyTransition())
+ .addTransition(ContainerState.RESUMING,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition(true))
// From REINITIALIZING State
.addTransition(ContainerState.REINITIALIZING,
@@ -405,6 +480,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.REINITIALIZING,
ContainerState.SCHEDULED,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
@@ -422,6 +499,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@@ -433,7 +512,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
// From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -445,7 +525,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
// From KILLING State.
.addTransition(ContainerState.KILLING,
@@ -479,7 +560,8 @@ public class ContainerImpl implements Container {
// in the container launcher
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
- ContainerEventType.CONTAINER_LAUNCHED)
+ EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.PAUSE_CONTAINER))
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -495,11 +577,13 @@ public class ContainerImpl implements Container {
EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.RESOURCE_FAILED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
- ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ ContainerEventType.PAUSE_CONTAINER))
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.INIT_CONTAINER)
.addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -525,6 +609,8 @@ public class ContainerImpl implements Container {
case LOCALIZING:
case LOCALIZATION_FAILED:
case SCHEDULED:
+ case PAUSED:
+ case RESUMING:
return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
case RUNNING:
case RELAUNCHING:
@@ -534,6 +620,7 @@ public class ContainerImpl implements Container {
case KILLING:
case CONTAINER_CLEANEDUP_AFTER_KILL:
case CONTAINER_RESOURCES_CLEANINGUP:
+ case PAUSING:
return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
case DONE:
default:
@@ -1473,6 +1560,26 @@ public class ContainerImpl implements Container {
}
/**
+ * Transitions upon receiving PAUSE_CONTAINER.
+ * - LOCALIZED -> KILLING.
+ * - REINITIALIZING -> KILLING.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class KillOnPauseTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Kill the process/process-grp
+ container.setIsReInitializing(false);
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.CLEANUP_CONTAINER));
+ }
+ }
+
+ /**
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
* upon receiving CONTAINER_KILLED_ON_REQUEST.
*/
@@ -1661,6 +1768,57 @@ public class ContainerImpl implements Container {
}
}
+ /**
+ * Transitions upon receiving PAUSE_CONTAINER.
+ * - RUNNING -> PAUSED
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class PauseContainerTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Pause the process/process-grp if it is supported by the container
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.PAUSE_CONTAINER));
+ ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
+ container.addDiagnostics(pauseEvent.getDiagnostic(), "\n");
+ }
+ }
+
+ /**
+ * Transitions upon receiving PAUSED_CONTAINER.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class PausedContainerTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Container was PAUSED so tell the scheduler
+ container.dispatcher.getEventHandler().handle(
+ new ContainerSchedulerEvent(container,
+ ContainerSchedulerEventType.CONTAINER_PAUSED));
+ }
+ }
+
+ /**
+ * Transitions upon receiving RESUME_CONTAINER.
+ * - PAUSED -> RUNNING
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class ResumeContainerTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Pause the process/process-grp if it is supported by the container
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.RESUME_CONTAINER));
+ ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
+ container.addDiagnostics(resumeEvent.getDiagnostic(), "\n");
+ }
+ }
+
@Override
public void handle(ContainerEvent event) {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
new file mode 100644
index 0000000..898304e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.PAUSE_CONTAINER.
+ */
+public class ContainerPauseEvent extends ContainerEvent {
+
+ private final String diagnostic;
+
+ public ContainerPauseEvent(ContainerId cId,
+ String diagnostic) {
+ super(cId, ContainerEventType.PAUSE_CONTAINER);
+ this.diagnostic = diagnostic;
+ }
+
+ public String getDiagnostic() {
+ return this.diagnostic;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
new file mode 100644
index 0000000..d7c9e9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.RESUME_CONTAINER.
+ */
+public class ContainerResumeEvent extends ContainerEvent {
+
+ private final String diagnostic;
+
+ public ContainerResumeEvent(ContainerId cId,
+ String diagnostic) {
+ super(cId, ContainerEventType.RESUME_CONTAINER);
+ this.diagnostic = diagnostic;
+ }
+
+ public String getDiagnostic() {
+ return this.diagnostic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 91d1356..7c3fea8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
- CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
+ CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
+ PAUSING, PAUSED, RESUMING
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 823457f..2516e11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> {
@@ -103,8 +105,10 @@ public class ContainerLaunch implements Callable<Integer> {
private final Configuration conf;
private final Context context;
private final ContainerManagerImpl containerManager;
-
+
protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
+ protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);
+
protected AtomicBoolean completed = new AtomicBoolean(false);
private volatile boolean killedBeforeStart = false;
@@ -746,6 +750,90 @@ public class ContainerLaunch implements Callable<Integer> {
}
/**
+ * Pause the container.
+ * Cancels the launch if the container isn't launched yet. Otherwise asks the
+ * executor to pause the container.
+ * @throws IOException in case of errors.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void pauseContainer() throws IOException {
+ ContainerId containerId = container.getContainerId();
+ String containerIdStr = containerId.toString();
+ LOG.info("Pausing the container " + containerIdStr);
+
+ // The pause event is only handled if the container is in the running state
+ // (the container state machine), so we don't check for
+ // shouldLaunchContainer over here
+
+ if (!shouldPauseContainer.compareAndSet(false, true)) {
+ LOG.info("Container " + containerId + " not paused as "
+ + "resume already called");
+ return;
+ }
+
+ try {
+ // Pause the container
+ exec.pauseContainer(container);
+
+ // PauseContainer is a blocking call. We are here almost means the
+ // container is paused, so send out the event.
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ containerId,
+ ContainerEventType.CONTAINER_PAUSED));
+ } catch (Exception e) {
+ String message =
+ "Exception when trying to pause container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.info(message);
+ container.handle(new ContainerKillEvent(container.getContainerId(),
+ ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+ + " an exception in pausing it."));
+ }
+ }
+
+ /**
+ * Resume the container.
+ * Cancels the launch if the container isn't launched yet. Otherwise asks the
+ * executor to pause the container.
+ * @throws IOException in case of error.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void resumeContainer() throws IOException {
+ ContainerId containerId = container.getContainerId();
+ String containerIdStr = containerId.toString();
+ LOG.info("Resuming the container " + containerIdStr);
+
+ // The resume event is only handled if the container is in a paused state
+ // so we don't check for the launched flag here.
+
+ // paused flag will be set to true if process already paused
+ boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
+ if (!alreadyPaused) {
+ LOG.info("Container " + containerIdStr + " not paused."
+ + " No resume necessary");
+ return;
+ }
+
+ // If the container has already started
+ try {
+ exec.resumeContainer(container);
+ // ResumeContainer is a blocking call. We are here almost means the
+ // container is resumed, so send out the event.
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ containerId,
+ ContainerEventType.CONTAINER_RESUMED));
+ } catch (Exception e) {
+ String message =
+ "Exception when trying to resume container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.info(message);
+ container.handle(new ContainerKillEvent(container.getContainerId(),
+ ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+ + " an exception in pausing it."));
+ }
+ }
+
+ /**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index d4a7bfd..eb6eaf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import com.google.common.annotations.VisibleForTesting;
@@ -170,6 +172,36 @@ public class ContainersLauncher extends AbstractService
+ " with command " + signalEvent.getCommand());
}
break;
+ case PAUSE_CONTAINER:
+ ContainerLaunch launchedContainer = running.get(containerId);
+ if (launchedContainer == null) {
+ // Container not launched. So nothing needs to be done.
+ return;
+ }
+
+ // Pause the container
+ try {
+ launchedContainer.pauseContainer();
+ } catch (Exception e) {
+ LOG.info("Got exception while pausing container: " +
+ StringUtils.stringifyException(e));
+ }
+ break;
+ case RESUME_CONTAINER:
+ ContainerLaunch launchCont = running.get(containerId);
+ if (launchCont == null) {
+ // Container not launched. So nothing needs to be done.
+ return;
+ }
+
+ // Resume the container.
+ try {
+ launchCont.resumeContainer();
+ } catch (Exception e) {
+ LOG.info("Got exception while resuming container: " +
+ StringUtils.stringifyException(e));
+ }
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 380a032..1054e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -25,4 +25,7 @@ public enum ContainersLauncherEventType {
CLEANUP_CONTAINER, // The process(grp) itself.
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
SIGNAL_CONTAINER,
+ PAUSE_CONTAINER,
+ RESUME_CONTAINER
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 086cb9b..9ff731f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -26,4 +26,5 @@ public enum ContainerSchedulerEventType {
CONTAINER_COMPLETED,
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
+ CONTAINER_PAUSED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 33f4609..8909088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
public class TestContainer {
@@ -207,6 +208,42 @@ public class TestContainer {
@Test
@SuppressWarnings("unchecked") // mocked generic
+ public void testContainerPauseAndResume() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ int running = metrics.getRunningContainers();
+ wc.launchContainer();
+ assertEquals(running + 1, metrics.getRunningContainers());
+ reset(wc.localizerBus);
+ wc.pauseContainer();
+ assertEquals(ContainerState.PAUSED,
+ wc.c.getContainerState());
+ wc.resumeContainer();
+ assertEquals(ContainerState.RUNNING,
+ wc.c.getContainerState());
+ wc.containerKilledOnRequest();
+ assertEquals(ContainerState.EXITED_WITH_FAILURE,
+ wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
+ verifyCleanupCall(wc);
+ int failed = metrics.getFailedContainers();
+ wc.containerResourcesCleanup();
+ assertEquals(ContainerState.DONE, wc.c.getContainerState());
+ assertEquals(failed + 1, metrics.getFailedContainers());
+ assertEquals(running, metrics.getRunningContainers());
+ }
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnFailure() throws Exception {
WrappedContainer wc = null;
try {
@@ -955,6 +992,8 @@ public class TestContainer {
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
ContainerExecutor executor = mock(ContainerExecutor.class);
+ Mockito.doNothing().when(executor).pauseContainer(any(Container.class));
+ Mockito.doNothing().when(executor).resumeContainer(any(Container.class));
launcher =
new ContainersLauncher(context, dispatcher, executor, null, null);
// create a mock ExecutorService, which will not really launch
@@ -1143,6 +1182,18 @@ public class TestContainer {
drainDispatcherEvents();
}
+ public void pauseContainer() {
+ c.handle(new ContainerPauseEvent(cId,
+ "PauseRequest"));
+ drainDispatcherEvents();
+ }
+
+ public void resumeContainer() {
+ c.handle(new ContainerResumeEvent(cId,
+ "ResumeRequest"));
+ drainDispatcherEvents();
+ }
+
public void containerKilledOnRequest() {
int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
String diagnosticMsg = "Container completed with exit code " + exitCode;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org