You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/14 17:05:14 UTC
[1/3] hadoop git commit: YARN-5292. NM Container lifecycle and state
transitions to support for PAUSED container state. (Hitesh Sharma via
asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk b9465bb87 -> 66ca0a654
YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/864fbacd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/864fbacd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/864fbacd
Branch: refs/heads/trunk
Commit: 864fbacd4548004b1de8b0812627976acd22aff5
Parents: b9465bb
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 9 07:51:03 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 14 08:48:39 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/api/records/ContainerState.java | 7 +-
.../src/main/proto/yarn_protos.proto | 1 +
.../server/nodemanager/ContainerExecutor.java | 22 +++
.../container/ContainerEventType.java | 6 +-
.../container/ContainerImpl.java | 170 ++++++++++++++++++-
.../container/ContainerPauseEvent.java | 40 +++++
.../container/ContainerResumeEvent.java | 39 +++++
.../container/ContainerState.java | 3 +-
.../launcher/ContainerLaunch.java | 90 +++++++++-
.../launcher/ContainersLauncher.java | 32 ++++
.../launcher/ContainersLauncherEventType.java | 3 +
.../scheduler/ContainerSchedulerEventType.java | 1 +
.../container/TestContainer.java | 51 ++++++
13 files changed, 454 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 696fe06..45e5bd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -33,11 +33,14 @@ public enum ContainerState {
/** Running container */
RUNNING,
-
+
/** Completed container */
COMPLETE,
/** Scheduled (awaiting resources) at the NM. */
@InterfaceStability.Unstable
- SCHEDULED
+ SCHEDULED,
+
+ /** Paused at the NM. */
+ PAUSED
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 9933e9e..066441c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -110,6 +110,7 @@ enum ContainerStateProto {
C_RUNNING = 2;
C_COMPLETE = 3;
C_SCHEDULED = 4;
+ C_PAUSED = 5;
}
message ContainerProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 072cca7..da50d7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -700,6 +700,28 @@ public abstract class ContainerExecutor implements Configurable {
}
/**
+ * Pause the container. The default implementation is to raise a kill event.
+ * Specific executor implementations can override this behavior.
+ * @param container
+ * the Container
+ */
+ public void pauseContainer(Container container) {
+ LOG.warn(container.getContainerId() + " doesn't support pausing.");
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Resume the container from pause state. The default implementation ignores
+ * this event. Specific implementations can override this behavior.
+ * @param container
+ * the Container
+ */
+ public void resumeContainer(Container container) {
+ LOG.warn(container.getContainerId() + " doesn't support resume.");
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Get the process-identifier for the container.
*
* @param containerID the container ID
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index afea0e6..1475435 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -27,6 +27,8 @@ public enum ContainerEventType {
CONTAINER_DONE,
REINITIALIZE_CONTAINER,
ROLLBACK_REINIT,
+ PAUSE_CONTAINER,
+ RESUME_CONTAINER,
// DownloadManager
CONTAINER_INITED,
@@ -38,5 +40,7 @@ public enum ContainerEventType {
CONTAINER_LAUNCHED,
CONTAINER_EXITED_WITH_SUCCESS,
CONTAINER_EXITED_WITH_FAILURE,
- CONTAINER_KILLED_ON_REQUEST
+ CONTAINER_KILLED_ON_REQUEST,
+ CONTAINER_PAUSED,
+ CONTAINER_RESUMED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 1a48b12..7a12371 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -307,6 +307,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
+ .addTransition(ContainerState.NEW, ContainerState.DONE,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
@@ -322,6 +324,8 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
+ .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From LOCALIZATION_FAILED State
.addTransition(ContainerState.LOCALIZATION_FAILED,
@@ -335,7 +339,8 @@ public class ContainerImpl implements Container {
// container not launched so kill is a no-op
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
// container cleanup triggers a release of all resources
// regardless of whether they were localized or not
// LocalizedResource handles release event in all states
@@ -391,6 +396,76 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
+ ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+
+ // From PAUSING State
+ .addTransition(ContainerState.PAUSING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
+ ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
+ // In case something goes wrong then container will exit from the
+ // PAUSING state
+ .addTransition(ContainerState.PAUSING,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
+ .addTransition(ContainerState.PAUSING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ new KilledExternallyTransition())
+
+ // From PAUSED State
+ .addTransition(ContainerState.PAUSED, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+ ContainerEventType.PAUSE_CONTAINER)
+ .addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
+ ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
+ // In case something goes wrong then container will exit from the
+ // PAUSED state
+ .addTransition(ContainerState.PAUSED,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ new KilledExternallyTransition())
+ .addTransition(ContainerState.PAUSED,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition(true))
+
+ // From RESUMING State
+ .addTransition(ContainerState.RESUMING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
+ ContainerEventType.CONTAINER_RESUMED)
+ .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ // In case something goes wrong then container will exit from the
+ // RESUMING state
+ .addTransition(ContainerState.RESUMING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.RESUMING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ new KilledExternallyTransition())
+ .addTransition(ContainerState.RESUMING,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition(true))
// From REINITIALIZING State
.addTransition(ContainerState.REINITIALIZING,
@@ -414,6 +489,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.REINITIALIZING,
ContainerState.SCHEDULED,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
@@ -431,6 +508,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
+ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@@ -442,7 +521,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
// From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -454,7 +534,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
// From KILLING State.
.addTransition(ContainerState.KILLING,
@@ -488,7 +569,8 @@ public class ContainerImpl implements Container {
// in the container launcher
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
- ContainerEventType.CONTAINER_LAUNCHED)
+ EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.PAUSE_CONTAINER))
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -504,11 +586,13 @@ public class ContainerImpl implements Container {
EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.RESOURCE_FAILED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
- ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ ContainerEventType.PAUSE_CONTAINER))
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.PAUSE_CONTAINER))
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.INIT_CONTAINER)
.addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -534,6 +618,8 @@ public class ContainerImpl implements Container {
case LOCALIZING:
case LOCALIZATION_FAILED:
case SCHEDULED:
+ case PAUSED:
+ case RESUMING:
return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
case RUNNING:
case RELAUNCHING:
@@ -543,6 +629,7 @@ public class ContainerImpl implements Container {
case KILLING:
case CONTAINER_CLEANEDUP_AFTER_KILL:
case CONTAINER_RESOURCES_CLEANINGUP:
+ case PAUSING:
return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
case DONE:
default:
@@ -1501,6 +1588,26 @@ public class ContainerImpl implements Container {
}
/**
+ * Transitions upon receiving PAUSE_CONTAINER.
+ * - LOCALIZED -> KILLING.
+ * - REINITIALIZING -> KILLING.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class KillOnPauseTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Kill the process/process-grp
+ container.setIsReInitializing(false);
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.CLEANUP_CONTAINER));
+ }
+ }
+
+ /**
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
* upon receiving CONTAINER_KILLED_ON_REQUEST.
*/
@@ -1690,6 +1797,57 @@ public class ContainerImpl implements Container {
}
}
+ /**
+ * Transitions upon receiving PAUSE_CONTAINER.
+ * - RUNNING -> PAUSED
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class PauseContainerTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Pause the process/process-grp if it is supported by the container
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.PAUSE_CONTAINER));
+ ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
+ container.addDiagnostics(pauseEvent.getDiagnostic(), "\n");
+ }
+ }
+
+ /**
+ * Transitions upon receiving PAUSED_CONTAINER.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class PausedContainerTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Container was PAUSED so tell the scheduler
+ container.dispatcher.getEventHandler().handle(
+ new ContainerSchedulerEvent(container,
+ ContainerSchedulerEventType.CONTAINER_PAUSED));
+ }
+ }
+
+ /**
+ * Transitions upon receiving RESUME_CONTAINER.
+ * - PAUSED -> RUNNING
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class ResumeContainerTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Pause the process/process-grp if it is supported by the container
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.RESUME_CONTAINER));
+ ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
+ container.addDiagnostics(resumeEvent.getDiagnostic(), "\n");
+ }
+ }
+
@Override
public void handle(ContainerEvent event) {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
new file mode 100644
index 0000000..898304e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.PAUSE_CONTAINER.
+ */
+public class ContainerPauseEvent extends ContainerEvent {
+
+ private final String diagnostic;
+
+ public ContainerPauseEvent(ContainerId cId,
+ String diagnostic) {
+ super(cId, ContainerEventType.PAUSE_CONTAINER);
+ this.diagnostic = diagnostic;
+ }
+
+ public String getDiagnostic() {
+ return this.diagnostic;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
new file mode 100644
index 0000000..d7c9e9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.RESUME_CONTAINER.
+ */
+public class ContainerResumeEvent extends ContainerEvent {
+
+ private final String diagnostic;
+
+ public ContainerResumeEvent(ContainerId cId,
+ String diagnostic) {
+ super(cId, ContainerEventType.RESUME_CONTAINER);
+ this.diagnostic = diagnostic;
+ }
+
+ public String getDiagnostic() {
+ return this.diagnostic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 91d1356..7c3fea8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
- CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
+ CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
+ PAUSING, PAUSED, RESUMING
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index d0ce787..89dfdd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> {
@@ -106,8 +108,10 @@ public class ContainerLaunch implements Callable<Integer> {
private final Configuration conf;
private final Context context;
private final ContainerManagerImpl containerManager;
-
+
protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
+ protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);
+
protected AtomicBoolean completed = new AtomicBoolean(false);
private volatile boolean killedBeforeStart = false;
@@ -803,6 +807,90 @@ public class ContainerLaunch implements Callable<Integer> {
}
/**
+ * Pause the container.
+ * Cancels the launch if the container isn't launched yet. Otherwise asks the
+ * executor to pause the container.
+ * @throws IOException in case of errors.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void pauseContainer() throws IOException {
+ ContainerId containerId = container.getContainerId();
+ String containerIdStr = containerId.toString();
+ LOG.info("Pausing the container " + containerIdStr);
+
+ // The pause event is only handled if the container is in the running state
+ // (the container state machine), so we don't check for
+ // shouldLaunchContainer over here
+
+ if (!shouldPauseContainer.compareAndSet(false, true)) {
+ LOG.info("Container " + containerId + " not paused as "
+ + "resume already called");
+ return;
+ }
+
+ try {
+ // Pause the container
+ exec.pauseContainer(container);
+
+ // PauseContainer is a blocking call. We are here almost means the
+ // container is paused, so send out the event.
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ containerId,
+ ContainerEventType.CONTAINER_PAUSED));
+ } catch (Exception e) {
+ String message =
+ "Exception when trying to pause container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.info(message);
+ container.handle(new ContainerKillEvent(container.getContainerId(),
+ ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+ + " an exception in pausing it."));
+ }
+ }
+
+ /**
+ * Resume the container.
+ * Cancels the launch if the container isn't launched yet. Otherwise asks the
+ * executor to pause the container.
+ * @throws IOException in case of error.
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void resumeContainer() throws IOException {
+ ContainerId containerId = container.getContainerId();
+ String containerIdStr = containerId.toString();
+ LOG.info("Resuming the container " + containerIdStr);
+
+ // The resume event is only handled if the container is in a paused state
+ // so we don't check for the launched flag here.
+
+ // paused flag will be set to true if process already paused
+ boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
+ if (!alreadyPaused) {
+ LOG.info("Container " + containerIdStr + " not paused."
+ + " No resume necessary");
+ return;
+ }
+
+ // If the container has already started
+ try {
+ exec.resumeContainer(container);
+ // ResumeContainer is a blocking call. We are here almost means the
+ // container is resumed, so send out the event.
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ containerId,
+ ContainerEventType.CONTAINER_RESUMED));
+ } catch (Exception e) {
+ String message =
+ "Exception when trying to resume container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.info(message);
+ container.handle(new ContainerKillEvent(container.getContainerId(),
+ ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+ + " an exception in pausing it."));
+ }
+ }
+
+ /**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 25909b9..ca69712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import com.google.common.annotations.VisibleForTesting;
@@ -171,6 +173,36 @@ public class ContainersLauncher extends AbstractService
+ " with command " + signalEvent.getCommand());
}
break;
+ case PAUSE_CONTAINER:
+ ContainerLaunch launchedContainer = running.get(containerId);
+ if (launchedContainer == null) {
+ // Container not launched. So nothing needs to be done.
+ return;
+ }
+
+ // Pause the container
+ try {
+ launchedContainer.pauseContainer();
+ } catch (Exception e) {
+ LOG.info("Got exception while pausing container: " +
+ StringUtils.stringifyException(e));
+ }
+ break;
+ case RESUME_CONTAINER:
+ ContainerLaunch launchCont = running.get(containerId);
+ if (launchCont == null) {
+ // Container not launched. So nothing needs to be done.
+ return;
+ }
+
+ // Resume the container.
+ try {
+ launchCont.resumeContainer();
+ } catch (Exception e) {
+ LOG.info("Got exception while resuming container: " +
+ StringUtils.stringifyException(e));
+ }
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 380a032..1054e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -25,4 +25,7 @@ public enum ContainersLauncherEventType {
CLEANUP_CONTAINER, // The process(grp) itself.
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
SIGNAL_CONTAINER,
+ PAUSE_CONTAINER,
+ RESUME_CONTAINER
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 917eda0..a9cbf74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -27,4 +27,5 @@ public enum ContainerSchedulerEventType {
UPDATE_CONTAINER,
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
+ CONTAINER_PAUSED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/864fbacd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 33f4609..8909088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
public class TestContainer {
@@ -207,6 +208,42 @@ public class TestContainer {
@Test
@SuppressWarnings("unchecked") // mocked generic
+ public void testContainerPauseAndResume() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ int running = metrics.getRunningContainers();
+ wc.launchContainer();
+ assertEquals(running + 1, metrics.getRunningContainers());
+ reset(wc.localizerBus);
+ wc.pauseContainer();
+ assertEquals(ContainerState.PAUSED,
+ wc.c.getContainerState());
+ wc.resumeContainer();
+ assertEquals(ContainerState.RUNNING,
+ wc.c.getContainerState());
+ wc.containerKilledOnRequest();
+ assertEquals(ContainerState.EXITED_WITH_FAILURE,
+ wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
+ verifyCleanupCall(wc);
+ int failed = metrics.getFailedContainers();
+ wc.containerResourcesCleanup();
+ assertEquals(ContainerState.DONE, wc.c.getContainerState());
+ assertEquals(failed + 1, metrics.getFailedContainers());
+ assertEquals(running, metrics.getRunningContainers());
+ }
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnFailure() throws Exception {
WrappedContainer wc = null;
try {
@@ -955,6 +992,8 @@ public class TestContainer {
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
ContainerExecutor executor = mock(ContainerExecutor.class);
+ Mockito.doNothing().when(executor).pauseContainer(any(Container.class));
+ Mockito.doNothing().when(executor).resumeContainer(any(Container.class));
launcher =
new ContainersLauncher(context, dispatcher, executor, null, null);
// create a mock ExecutorService, which will not really launch
@@ -1143,6 +1182,18 @@ public class TestContainer {
drainDispatcherEvents();
}
+ public void pauseContainer() {
+ c.handle(new ContainerPauseEvent(cId,
+ "PauseRequest"));
+ drainDispatcherEvents();
+ }
+
+ public void resumeContainer() {
+ c.handle(new ContainerResumeEvent(cId,
+ "ResumeRequest"));
+ drainDispatcherEvents();
+ }
+
public void containerKilledOnRequest() {
int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
String diagnosticMsg = "Container completed with exit code " + exitCode;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: YARN-6059. Update paused container state in
the NM state store. (Hitesh Sharma via asuresh)
Posted by as...@apache.org.
YARN-6059. Update paused container state in the NM state store. (Hitesh Sharma via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/66ca0a65
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/66ca0a65
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/66ca0a65
Branch: refs/heads/trunk
Commit: 66ca0a65408521d5f9b080dd16b353b49fb8eaea
Parents: 4f81944
Author: Arun Suresh <as...@apache.org>
Authored: Tue Sep 12 12:22:00 2017 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 14 08:51:26 2017 -0700
----------------------------------------------------------------------
.../container/ContainerImpl.java | 16 ++-
.../launcher/ContainerLaunch.java | 28 ++++-
.../launcher/ContainersLauncher.java | 10 ++
.../launcher/ContainersLauncherEventType.java | 3 +-
.../launcher/RecoverPausedContainerLaunch.java | 124 +++++++++++++++++++
.../launcher/RecoveredContainerLaunch.java | 3 +-
.../recovery/NMLeveldbStateStoreService.java | 42 ++++++-
.../recovery/NMNullStateStoreService.java | 9 ++
.../recovery/NMStateStoreService.java | 26 +++-
.../recovery/NMMemoryStateStoreService.java | 13 ++
.../TestNMLeveldbStateStoreService.java | 17 +++
11 files changed, 273 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 95ebfd5..9b9c47f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -836,10 +836,18 @@ public class ContainerImpl implements Container {
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendScheduleEvent() {
- dispatcher.getEventHandler().handle(
- new ContainerSchedulerEvent(this,
- ContainerSchedulerEventType.SCHEDULE_CONTAINER)
- );
+ if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
+ // Recovery is not supported for paused container so we raise the
+ // launch event which will proceed to kill the paused container instead
+ // of raising the schedule event.
+ ContainersLauncherEventType launcherEvent;
+ launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
+ dispatcher.getEventHandler()
+ .handle(new ContainersLauncherEvent(this, launcherEvent));
+ } else {
+ dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
+ ContainerSchedulerEventType.SCHEDULE_CONTAINER));
+ }
}
@SuppressWarnings("unchecked") // dispatcher not typed
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 89dfdd1..e254887 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -837,6 +837,14 @@ public class ContainerLaunch implements Callable<Integer> {
dispatcher.getEventHandler().handle(new ContainerEvent(
containerId,
ContainerEventType.CONTAINER_PAUSED));
+
+ try {
+ this.context.getNMStateStore().storeContainerPaused(
+ container.getContainerId());
+ } catch (IOException e) {
+ LOG.warn("Could not store container [" + container.getContainerId()
+ + "] state. The Container has been paused.", e);
+ }
} catch (Exception e) {
String message =
"Exception when trying to pause container " + containerIdStr
@@ -873,12 +881,20 @@ public class ContainerLaunch implements Callable<Integer> {
// If the container has already started
try {
- exec.resumeContainer(container);
- // ResumeContainer is a blocking call. We are here almost means the
- // container is resumed, so send out the event.
- dispatcher.getEventHandler().handle(new ContainerEvent(
- containerId,
- ContainerEventType.CONTAINER_RESUMED));
+ exec.resumeContainer(container);
+ // ResumeContainer is a blocking call. We are here almost means the
+ // container is resumed, so send out the event.
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ containerId,
+ ContainerEventType.CONTAINER_RESUMED));
+
+ try {
+ this.context.getNMStateStore().removeContainerPaused(
+ container.getContainerId());
+ } catch (IOException e) {
+ LOG.warn("Could not store container [" + container.getContainerId()
+ + "] state. The Container has been resumed.", e);
+ }
} catch (Exception e) {
String message =
"Exception when trying to resume container " + containerIdStr
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index ca69712..9f6ef74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -139,6 +139,16 @@ public class ContainersLauncher extends AbstractService
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
+ case RECOVER_PAUSED_CONTAINER:
+ // Recovery for paused containers is not supported, thus here
+ // we locate any paused containers, and terminate them.
+ app = context.getApplications().get(
+ containerId.getApplicationAttemptId().getApplicationId());
+ launch = new RecoverPausedContainerLaunch(context, getConfig(),
+ dispatcher, exec, app, event.getContainer(), dirsHandler,
+ containerManager);
+ containerLauncher.submit(launch);
+ break;
case CLEANUP_CONTAINER:
case CLEANUP_CONTAINER_FOR_REINIT:
ContainerLaunch launcher = running.remove(containerId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 1054e06..847ee34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -26,6 +26,7 @@ public enum ContainersLauncherEventType {
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
SIGNAL_CONTAINER,
PAUSE_CONTAINER,
- RESUME_CONTAINER
+ RESUME_CONTAINER,
+ RECOVER_PAUSED_CONTAINER
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
new file mode 100644
index 0000000..14cab9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+/**
+ * This is a ContainerLaunch which has been recovered after an NM restart for
+ * pause containers (for rolling upgrades)
+ */
+public class RecoverPausedContainerLaunch extends ContainerLaunch {
+
+ private static final Log LOG = LogFactory.getLog(
+ RecoveredContainerLaunch.class);
+
+ public RecoverPausedContainerLaunch(Context context,
+ Configuration configuration, Dispatcher dispatcher,
+ ContainerExecutor exec, Application app, Container container,
+ LocalDirsHandlerService dirsHandler,
+ ContainerManagerImpl containerManager) {
+ super(context, configuration, dispatcher, exec, app, container, dirsHandler,
+ containerManager);
+ }
+
+ /**
+ * Cleanup the paused container by issuing a kill on it.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public Integer call() {
+ int retCode = ContainerExecutor.ExitCode.LOST.getExitCode();
+ ContainerId containerId = container.getContainerId();
+ String appIdStr =
+ containerId.getApplicationAttemptId().getApplicationId().toString();
+ String containerIdStr = containerId.toString();
+
+ boolean notInterrupted = true;
+ try {
+ File pidFile = locatePidFile(appIdStr, containerIdStr);
+ if (pidFile != null) {
+ String pidPathStr = pidFile.getPath();
+ pidFilePath = new Path(pidPathStr);
+ exec.activateContainer(containerId, pidFilePath);
+ exec.signalContainer(new ContainerSignalContext.Builder()
+ .setContainer(container)
+ .setUser(container.getUser())
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build());
+ } else {
+ LOG.warn("Unable to locate pid file for container " + containerIdStr);
+ }
+
+ } catch (InterruptedIOException e) {
+ LOG.warn("Interrupted while waiting for exit code from " + containerId);
+ notInterrupted = false;
+ } catch (IOException e) {
+ LOG.error("Unable to kill the paused container " + containerIdStr, e);
+ } finally {
+ if (notInterrupted) {
+ this.completed.set(true);
+ exec.deactivateContainer(containerId);
+ try {
+ getContext().getNMStateStore()
+ .storeContainerCompleted(containerId, retCode);
+ } catch (IOException e) {
+ LOG.error("Unable to set exit code for container " + containerId);
+ }
+ }
+ }
+
+ LOG.warn("Recovered container exited with a non-zero exit code "
+ + retCode);
+ this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
+ containerId,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
+ "Container exited with a non-zero exit code " + retCode));
+
+ return retCode;
+ }
+
+ private File locatePidFile(String appIdStr, String containerIdStr) {
+ String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
+ for (String dir : getContext().getLocalDirsHandler().
+ getLocalDirsForRead()) {
+ File pidFile = new File(dir, pidSubpath);
+ if (pidFile.exists()) {
+ return pidFile;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
index 2eba0df..17ddd77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
@@ -40,10 +40,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
-
/**
* This is a ContainerLaunch which has been recovered after an NM restart (for
- * rolling upgrades)
+ * rolling upgrades).
*/
public class RecoveredContainerLaunch extends ContainerLaunch {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index db931f8..2f9c0a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -119,6 +119,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
+ private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
"/resourceChanged";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@@ -272,9 +273,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
if (rcs.status == RecoveredContainerStatus.REQUESTED) {
rcs.status = RecoveredContainerStatus.QUEUED;
}
+ } else if (suffix.equals(CONTAINER_PAUSED_KEY_SUFFIX)) {
+ if ((rcs.status == RecoveredContainerStatus.LAUNCHED)
+ ||(rcs.status == RecoveredContainerStatus.QUEUED)
+ ||(rcs.status == RecoveredContainerStatus.REQUESTED)) {
+ rcs.status = RecoveredContainerStatus.PAUSED;
+ }
} else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
if ((rcs.status == RecoveredContainerStatus.REQUESTED)
- || (rcs.status == RecoveredContainerStatus.QUEUED)) {
+ || (rcs.status == RecoveredContainerStatus.QUEUED)
+ ||(rcs.status == RecoveredContainerStatus.PAUSED)) {
rcs.status = RecoveredContainerStatus.LAUNCHED;
}
} else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@@ -367,6 +375,37 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerPaused(ContainerId containerId) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("storeContainerPaused: containerId=" + containerId);
+ }
+
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_PAUSED_KEY_SUFFIX;
+ try {
+ db.put(bytes(key), EMPTY_VALUE);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void removeContainerPaused(ContainerId containerId)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removeContainerPaused: containerId=" + containerId);
+ }
+
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_PAUSED_KEY_SUFFIX;
+ try {
+ db.delete(bytes(key));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
public void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException {
if (LOG.isDebugEnabled()) {
@@ -510,6 +549,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
+ batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
List<String> unknownKeysForContainer = containerUnknownKeySuffixes
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index dc1cece..d1d0696 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -80,6 +80,15 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerPaused(ContainerId containerId) throws IOException {
+ }
+
+ @Override
+ public void removeContainerPaused(ContainerId containerId)
+ throws IOException {
+ }
+
+ @Override
public void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 62a2b9f..999d2d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -73,7 +73,8 @@ public abstract class NMStateStoreService extends AbstractService {
REQUESTED,
QUEUED,
LAUNCHED,
- COMPLETED
+ COMPLETED,
+ PAUSED
}
public static class RecoveredContainerState {
@@ -349,9 +350,9 @@ public abstract class NMStateStoreService extends AbstractService {
}
/**
- * Load the state of applications
- * @return recovered state for applications
- * @throws IOException
+ * Load the state of applications.
+ * @return recovered state for applications.
+ * @throws IOException IO Exception.
*/
public abstract RecoveredApplicationsState loadApplicationsState()
throws IOException;
@@ -403,6 +404,23 @@ public abstract class NMStateStoreService extends AbstractService {
throws IOException;
/**
+ * Record that a container has been paused at the NM.
+ * @param containerId the container ID.
+ * @throws IOException IO Exception.
+ */
+ public abstract void storeContainerPaused(ContainerId containerId)
+ throws IOException;
+
+ /**
+ * Record that a container has been resumed at the NM by removing the
+ * fact that it has be paused.
+ * @param containerId the container ID.
+ * @throws IOException IO Exception.
+ */
+ public abstract void removeContainerPaused(ContainerId containerId)
+ throws IOException;
+
+ /**
* Record that a container has been launched
* @param containerId the container ID
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 6d6875d..59a225a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -145,6 +145,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerPaused(ContainerId containerId) throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.status = RecoveredContainerStatus.PAUSED;
+ }
+
+ @Override
+ public void removeContainerPaused(ContainerId containerId)
+ throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.status = RecoveredContainerStatus.LAUNCHED;
+ }
+
+ @Override
public synchronized void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66ca0a65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index b76f1ff..8c13356 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -289,6 +289,23 @@ public class TestNMLeveldbStateStoreService {
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
+ // pause the container, and verify recovered
+ stateStore.storeContainerPaused(containerId);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ rcs = recoveredContainers.get(0);
+ assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
+ assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+ assertEquals(false, rcs.getKilled());
+ assertEquals(containerReq, rcs.getStartRequest());
+
+ // Resume the container
+ stateStore.removeContainerPaused(containerId);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+
// increase the container size, and verify recovered
stateStore.storeContainerResourceChanged(containerId, 2,
Resource.newInstance(2468, 4));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: YARN-5216. Expose configurable preemption
policy for OPPORTUNISTIC containers running on the NM. (Hitesh Sharma via
asuresh)
Posted by as...@apache.org.
YARN-5216. Expose configurable preemption policy for OPPORTUNISTIC containers running on the NM. (Hitesh Sharma via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f819443
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f819443
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f819443
Branch: refs/heads/trunk
Commit: 4f8194430fc6a69d9cc99b78828fd7045d5683e8
Parents: 864fbac
Author: Arun Suresh <as...@apache.org>
Authored: Sat Dec 24 17:16:52 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 14 08:51:26 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 9 ++
.../src/main/resources/yarn-default.xml | 9 ++
.../containermanager/container/Container.java | 2 +
.../container/ContainerImpl.java | 32 ++++--
.../scheduler/ContainerScheduler.java | 84 ++++++++++++---
.../TestContainerSchedulerQueuing.java | 103 +++++++++++++++++++
.../nodemanager/webapp/MockContainer.java | 5 +
7 files changed, 218 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f819443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c6ec6fd..48910b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1088,6 +1088,15 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "container-retry-minimum-interval-ms";
public static final int DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS = 1000;
+ /**
+ * Use container pause as the preemption policy over kill in the container
+ * queue at a NodeManager.
+ **/
+ public static final String NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION =
+ NM_PREFIX + "opportunistic-containers-use-pause-for-preemption";
+ public static final boolean
+ DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION = false;
+
/** Interval at which the delayed token removal thread runs */
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f819443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d16d956..6444da9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3019,6 +3019,15 @@
<property>
<description>
+ Use container pause as the preemption policy over kill in the container
+ queue at a NodeManager.
+ </description>
+ <name>yarn.nodemanager.opportunistic-containers-use-pause-for-preemption</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>
Error filename pattern, to identify the file in the container's
Log directory which contain the container's error log. As error file
redirection is done by client/AM and yarn will not be aware of the error
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f819443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index ef5d72c..86f2554 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -103,4 +103,6 @@ public interface Container extends EventHandler<ContainerEvent> {
* @return Resource Mappings of the container
*/
ResourceMappings getResourceMappings();
+
+ void sendPauseEvent(String description);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f819443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 7a12371..95ebfd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -816,15 +816,22 @@ public class ContainerImpl implements Container {
@SuppressWarnings("unchecked") // dispatcher not typed
@Override
public void sendLaunchEvent() {
- ContainersLauncherEventType launcherEvent =
- ContainersLauncherEventType.LAUNCH_CONTAINER;
- if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
- // try to recover a container that was previously launched
- launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+ if (ContainerState.PAUSED == getContainerState()) {
+ dispatcher.getEventHandler().handle(
+ new ContainerResumeEvent(containerId,
+ "Container Resumed as some resources freed up"));
+ } else {
+ ContainersLauncherEventType launcherEvent =
+ ContainersLauncherEventType.LAUNCH_CONTAINER;
+ if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+ // try to recover a container that was previously launched
+ launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+ }
+ containerLaunchStartTime = clock.getTime();
+ dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(this, launcherEvent));
}
- containerLaunchStartTime = clock.getTime();
- dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(this, launcherEvent));
+
}
@SuppressWarnings("unchecked") // dispatcher not typed
@@ -844,6 +851,13 @@ public class ContainerImpl implements Container {
}
@SuppressWarnings("unchecked") // dispatcher not typed
+ @Override
+ public void sendPauseEvent(String description) {
+ dispatcher.getEventHandler().handle(
+ new ContainerPauseEvent(containerId, description));
+ }
+
+ @SuppressWarnings("unchecked") // dispatcher not typed
private void sendRelaunchEvent() {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.RELAUNCH_CONTAINER;
@@ -1799,7 +1813,7 @@ public class ContainerImpl implements Container {
/**
* Transitions upon receiving PAUSE_CONTAINER.
- * - RUNNING -> PAUSED
+ * - RUNNING -> PAUSING
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class PauseContainerTransition implements
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f819443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 7780f9f..830a06d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@@ -74,7 +76,7 @@ public class ContainerScheduler extends AbstractService implements
queuedOpportunisticContainers = new LinkedHashMap<>();
// Used to keep track of containers that have been marked to be killed
- // to make room for a guaranteed container.
+ // or paused to make room for a guaranteed container.
private final Map<ContainerId, Container> oppContainersToKill =
new HashMap<>();
@@ -98,6 +100,8 @@ public class ContainerScheduler extends AbstractService implements
private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
+ private Boolean usePauseEventForPreemption = false;
+
/**
* Instantiate a Container Scheduler.
* @param context NodeManager Context.
@@ -112,6 +116,17 @@ public class ContainerScheduler extends AbstractService implements
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
}
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ this.usePauseEventForPreemption =
+ conf.getBoolean(
+ YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
+ YarnConfiguration.
+ DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
+ }
+
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
@@ -136,8 +151,9 @@ public class ContainerScheduler extends AbstractService implements
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
+ case CONTAINER_PAUSED:
case CONTAINER_COMPLETED:
- onContainerCompleted(event.getContainer());
+ onResourcesReclaimed(event.getContainer());
break;
case UPDATE_CONTAINER:
if (event instanceof UpdateContainerSchedulerEvent) {
@@ -203,9 +219,9 @@ public class ContainerScheduler extends AbstractService implements
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
}
- //Kill opportunistic containers if any to make room for
+ //Kill/pause opportunistic containers if any to make room for
// promotion request
- killOpportunisticContainers(updateEvent.getContainer());
+ reclaimOpportunisticContainerResources(updateEvent.getContainer());
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
@@ -243,6 +259,12 @@ public class ContainerScheduler extends AbstractService implements
return this.runningContainers.size();
}
+ @VisibleForTesting
+ public void setUsePauseEventForPreemption(
+ boolean usePauseEventForPreemption) {
+ this.usePauseEventForPreemption = usePauseEventForPreemption;
+ }
+
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
this.opportunisticContainersStatus.setQueuedOpportContainers(
getNumQueuedOpportunisticContainers());
@@ -257,7 +279,7 @@ public class ContainerScheduler extends AbstractService implements
return this.opportunisticContainersStatus;
}
- private void onContainerCompleted(Container container) {
+ private void onResourcesReclaimed(Container container) {
oppContainersToKill.remove(container.getContainerId());
// This could be killed externally for eg. by the ContainerManager,
@@ -292,6 +314,23 @@ public class ContainerScheduler extends AbstractService implements
// Start pending guaranteed containers, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
+ // Resume opportunistic containers, if resource available.
+ if (resourcesAvailable) {
+ List<Container> pausedContainers = new ArrayList<Container>();
+ Map<ContainerId, Container> containers =
+ context.getContainers();
+ for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
+ ContainerId contId = entry.getKey();
+ // Find containers that were not already started and are in paused state
+ if(false == runningContainers.containsKey(contId)) {
+ if(containers.get(contId).getContainerState()
+ == ContainerState.PAUSED) {
+ pausedContainers.add(containers.get(contId));
+ }
+ }
+ }
+ resourcesAvailable = startContainers(pausedContainers, false);
+ }
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
@@ -395,7 +434,7 @@ public class ContainerScheduler extends AbstractService implements
// if the guaranteed container is queued, we need to preempt opportunistic
// containers for make room for it
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
- killOpportunisticContainers(container);
+ reclaimOpportunisticContainerResources(container);
}
} else {
// Given an opportunistic container, we first try to start as many queuing
@@ -413,19 +452,30 @@ public class ContainerScheduler extends AbstractService implements
}
}
- private void killOpportunisticContainers(Container container) {
- List<Container> extraOpportContainersToKill =
- pickOpportunisticContainersToKill(container.getContainerId());
+ @SuppressWarnings("unchecked")
+ private void reclaimOpportunisticContainerResources(Container container) {
+ List<Container> extraOppContainersToReclaim =
+ pickOpportunisticContainersToReclaimResources(
+ container.getContainerId());
// Kill the opportunistic containers that were chosen.
- for (Container contToKill : extraOpportContainersToKill) {
- contToKill.sendKillEvent(
- ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
- "Container Killed to make room for Guaranteed Container.");
- oppContainersToKill.put(contToKill.getContainerId(), contToKill);
+ for (Container contToReclaim : extraOppContainersToReclaim) {
+ String preemptionAction = usePauseEventForPreemption == true ? "paused" :
+ "resumed";
LOG.info(
- "Opportunistic container {} will be killed in order to start the "
+ "Container {} will be {} to start the "
+ "execution of guaranteed container {}.",
- contToKill.getContainerId(), container.getContainerId());
+ contToReclaim.getContainerId(), preemptionAction,
+ container.getContainerId());
+
+ if (usePauseEventForPreemption) {
+ contToReclaim.sendPauseEvent(
+ "Container Paused to make room for Guaranteed Container");
+ } else {
+ contToReclaim.sendKillEvent(
+ ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+ "Container Killed to make room for Guaranteed Container.");
+ }
+ oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
}
}
@@ -440,7 +490,7 @@ public class ContainerScheduler extends AbstractService implements
container.sendLaunchEvent();
}
- private List<Container> pickOpportunisticContainersToKill(
+ private List<Container> pickOpportunisticContainersToReclaimResources(
ContainerId containerToStartId) {
// The opportunistic containers that need to be killed for the
// given container to start.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f819443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 9676568..f3fc724 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -124,18 +127,38 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
@Override
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
+ ConcurrentMap<String, Boolean> oversleepMap =
+ new ConcurrentHashMap<String, Boolean>();
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
+ oversleepMap.put(ctx.getContainer().getContainerId().toString(), false);
if (delayContainers) {
try {
Thread.sleep(10000);
+ if(oversleepMap.get(ctx.getContainer().getContainerId().toString())
+ == true) {
+ Thread.sleep(10000);
+ }
} catch (InterruptedException e) {
// Nothing..
}
}
return super.launchContainer(ctx);
}
+
+ @Override
+ public void pauseContainer(Container container) {
+ // To mimic pausing we force the container to be in the PAUSED state
+ // a little longer by oversleeping.
+ oversleepMap.put(container.getContainerId().toString(), true);
+ LOG.info("Container was paused");
+ }
+
+ @Override
+ public void resumeContainer(Container container) {
+ LOG.info("Container was resumed");
+ }
};
exec.setConf(conf);
return spy(exec);
@@ -506,6 +529,86 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
}
/**
+ * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
+ * requests by each container as such that only one can run in parallel.
+ * Thus, the OPPORTUNISTIC container that started running, will be
+ * paused for the GUARANTEED container to start.
+ * Once the GUARANTEED container finishes its execution, the remaining
+ * OPPORTUNISTIC container will be executed.
+ * @throws Exception
+ */
+ @Test
+ public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
+ containerManager.start();
+ containerManager.getContainerScheduler().
+ setUsePauseEventForPreemption(true);
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ List<StartContainerRequest> list = new ArrayList<>();
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(0), ContainerState.RUNNING, 40);
+
+ list = new ArrayList<>();
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.GUARANTEED)));
+ allRequests =
+ StartContainersRequest.newInstance(list);
+
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(1), ContainerState.RUNNING, 40);
+
+ // Get container statuses. Container 0 should be paused, container 1
+ // should be running.
+ List<ContainerId> statList = new ArrayList<ContainerId>();
+ for (int i = 0; i < 2; i++) {
+ statList.add(createContainerId(i));
+ }
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List<ContainerStatus> containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ for (ContainerStatus status : containerStatuses) {
+ if (status.getContainerId().equals(createContainerId(0))) {
+ Assert.assertTrue(status.getDiagnostics().contains(
+ "Container Paused to make room for Guaranteed Container"));
+ } else if (status.getContainerId().equals(createContainerId(1))) {
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+ status.getState());
+ }
+ System.out.println("\nStatus : [" + status + "]\n");
+ }
+
+ // Make sure that the GUARANTEED container completes
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(1), ContainerState.DONE, 40);
+ // Make sure that the PAUSED opportunistic container resumes and
+ // starts running
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(0), ContainerState.DONE, 40);
+ }
+
+ /**
* 1. Submit a long running GUARANTEED container to hog all NM resources.
* 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.
* 3. Update the Queue Limit to 2.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f819443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index d435ba0..77ebd34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -245,4 +245,9 @@ public class MockContainer implements Container {
public ResourceMappings getResourceMappings() {
return null;
}
+
+ @Override
+ public void sendPauseEvent(String description) {
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org