You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/22 01:52:41 UTC
[11/50] hadoop git commit: YARN-7192. Add a pluggable StateMachine
Listener that is notified of NM Container State changes. Contributed by Arun
Suresh
YARN-7192. Add a pluggable StateMachine Listener that is notified of NM Container State changes. Contributed by Arun Suresh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4f9c7c9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4f9c7c9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4f9c7c9
Branch: refs/heads/YARN-6592
Commit: a4f9c7c9247801dd37beec6fc195622af1b884ad
Parents: 0f9af24
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Sep 18 10:16:09 2017 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Sep 18 10:16:09 2017 -0500
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 6 +-
.../state/MultiStateTransitionListener.java | 61 ++++++++++++++++++
.../hadoop/yarn/state/StateMachineFactory.java | 40 ++++++++++++
.../yarn/state/StateTransitionListener.java | 50 ++++++++++++++
.../src/main/resources/yarn-default.xml | 6 ++
.../ContainerStateTransitionListener.java | 48 ++++++++++++++
.../hadoop/yarn/server/nodemanager/Context.java | 2 +
.../yarn/server/nodemanager/NodeManager.java | 48 +++++++++++++-
.../container/ContainerImpl.java | 3 +-
.../server/nodemanager/TestNodeManager.java | 68 ++++++++++++++++++++
.../amrmproxy/BaseAMRMProxyTest.java | 8 +++
.../container/TestContainer.java | 53 +++++++++++++++
12 files changed, 389 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 48910b3..114453f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -968,9 +968,13 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "bind-host";
/** who will execute(launch) the containers.*/
- public static final String NM_CONTAINER_EXECUTOR =
+ public static final String NM_CONTAINER_EXECUTOR =
NM_PREFIX + "container-executor.class";
+ /** List of container state transition listeners.*/
+ public static final String NM_CONTAINER_STATE_TRANSITION_LISTENERS =
+ NM_PREFIX + "container-state-transition-listener.classes";
+
/**
* Adjustment to make to the container os scheduling priority.
* The valid values for this could vary depending on the platform.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java
new file mode 100644
index 0000000..1a28fc5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.state;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link StateTransitionListener} that dispatches the pre and post
+ * state transitions to multiple registered listeners.
+ * NOTE: The registered listeners are called in a for loop. Clients should
+ * know that a listener configured earlier might prevent a later listener
+ * from being called, if for instance it throws an un-caught Exception.
+ */
+public abstract class MultiStateTransitionListener
+ <OPERAND, EVENT, STATE extends Enum<STATE>> implements
+ StateTransitionListener<OPERAND, EVENT, STATE> {
+
+ private final List<StateTransitionListener<OPERAND, EVENT, STATE>> listeners =
+ new ArrayList<>();
+
+ /**
+ * Add a listener to the list of listeners.
+ * @param listener A listener.
+ */
+ public void addListener(StateTransitionListener<OPERAND, EVENT, STATE>
+ listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void preTransition(OPERAND op, STATE beforeState,
+ EVENT eventToBeProcessed) {
+ for (StateTransitionListener<OPERAND, EVENT, STATE> listener : listeners) {
+ listener.preTransition(op, beforeState, eventToBeProcessed);
+ }
+ }
+
+ @Override
+ public void postTransition(OPERAND op, STATE beforeState, STATE afterState,
+ EVENT processedEvent) {
+ for (StateTransitionListener<OPERAND, EVENT, STATE> listener : listeners) {
+ listener.postTransition(op, beforeState, afterState, processedEvent);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java
index 5b76ce8..4bb005c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java
@@ -391,6 +391,21 @@ final public class StateMachineFactory
}
}
+ /**
+ * A StateMachine that accepts a transition listener.
+ * @param operand the object upon which the returned
+ * {@link StateMachine} will operate.
+ * @param initialState the state in which the returned
+ * {@link StateMachine} will start.
+ * @param listener An implementation of a {@link StateTransitionListener}.
+ * @return A (@link StateMachine}.
+ */
+ public StateMachine<STATE, EVENTTYPE, EVENT>
+ make(OPERAND operand, STATE initialState,
+ StateTransitionListener<OPERAND, EVENT, STATE> listener) {
+ return new InternalStateMachine(operand, initialState, listener);
+ }
+
/*
* @return a {@link StateMachine} that starts in
* {@code initialState} and whose {@link Transition} s are
@@ -424,14 +439,36 @@ final public class StateMachineFactory
return new InternalStateMachine(operand, defaultInitialState);
}
+ private static class NoopStateTransitionListener
+ implements StateTransitionListener {
+ @Override
+ public void preTransition(Object op, Enum beforeState,
+ Object eventToBeProcessed) { }
+
+ @Override
+ public void postTransition(Object op, Enum beforeState, Enum afterState,
+ Object processedEvent) { }
+ }
+
+ private static final NoopStateTransitionListener NOOP_LISTENER =
+ new NoopStateTransitionListener();
+
private class InternalStateMachine
implements StateMachine<STATE, EVENTTYPE, EVENT> {
private final OPERAND operand;
private STATE currentState;
+ private final StateTransitionListener<OPERAND, EVENT, STATE> listener;
InternalStateMachine(OPERAND operand, STATE initialState) {
+ this(operand, initialState, null);
+ }
+
+ InternalStateMachine(OPERAND operand, STATE initialState,
+ StateTransitionListener<OPERAND, EVENT, STATE> transitionListener) {
this.operand = operand;
this.currentState = initialState;
+ this.listener =
+ (transitionListener == null) ? NOOP_LISTENER : transitionListener;
if (!optimized) {
maybeMakeStateMachineTable();
}
@@ -445,8 +482,11 @@ final public class StateMachineFactory
@Override
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitionException {
+ listener.preTransition(operand, currentState, event);
+ STATE oldState = currentState;
currentState = StateMachineFactory.this.doTransition
(operand, currentState, eventType, event);
+ listener.postTransition(operand, oldState, currentState, event);
return currentState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java
new file mode 100644
index 0000000..657c193
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.state;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A State Transition Listener.
+ * It exposes a pre and post transition hook called before and
+ * after the transition.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface StateTransitionListener
+ <OPERAND, EVENT, STATE extends Enum<STATE>> {
+
+ /**
+ * Pre Transition Hook. This will be called before transition.
+ * @param op Operand.
+ * @param beforeState State before transition.
+ * @param eventToBeProcessed Incoming Event.
+ */
+ void preTransition(OPERAND op, STATE beforeState, EVENT eventToBeProcessed);
+
+ /**
+ * Post Transition Hook. This will be called after the transition.
+ * @param op Operand.
+ * @param beforeState State before transition.
+ * @param afterState State after transition.
+ * @param processedEvent Processed Event.
+ */
+ void postTransition(OPERAND op, STATE beforeState, STATE afterState,
+ EVENT processedEvent);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 6444da9..0440458 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1004,6 +1004,12 @@
</property>
<property>
+ <description>Comma separated List of container state transition listeners.</description>
+ <name>yarn.nodemanager.container-state-transition-listener.classes</name>
+ <value></value>
+ </property>
+
+ <property>
<description>Number of threads container manager uses.</description>
<name>yarn.nodemanager.container-manager.thread-count</name>
<value>20</value>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java
new file mode 100644
index 0000000..24cdb1f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.state.StateTransitionListener;
+
+/**
+ * Interface to be used by external cluster operators to implement a
+ * State Transition listener that is notified before and after a container
+ * state transition.
+ * NOTE: The pre and post transition callbacks will be made in the synchronized
+ * block as the call to the instrumented transition - Serially, in the
+ * order: preTransition, transition and postTransition. The implementor
+ * must ensure that the callbacks return in a timely manner to avoid
+ * blocking the state-machine.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ContainerStateTransitionListener extends
+ StateTransitionListener<ContainerImpl, ContainerEvent, ContainerState> {
+
+ /**
+ * Init method which will be invoked by the Node Manager to inject the
+ * NM {@link Context}.
+ * @param context NM Context.
+ */
+ void init(Context context);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 00bd0ef..a2d00a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -120,4 +120,6 @@ public interface Context {
NMTimelinePublisher getNMTimelinePublisher();
ContainerExecutor getContainerExecutor();
+
+ ContainerStateTransitionListener getContainerStateTransitionListener();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 3e919c5..a97b3f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.state.MultiStateTransitionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,6 +136,17 @@ public class NodeManager extends CompositeService
private boolean rmWorkPreservingRestartEnabled;
private boolean shouldExitOnShutdownEvent = false;
+ /**
+ * Default Container State transition listener.
+ */
+ public static class DefaultContainerStateListener extends
+ MultiStateTransitionListener
+ <ContainerImpl, ContainerEvent, ContainerState>
+ implements ContainerStateTransitionListener {
+ @Override
+ public void init(Context context) {}
+ }
+
public NodeManager() {
super(NodeManager.class.getName());
}
@@ -219,8 +236,22 @@ public class NodeManager extends CompositeService
NMTokenSecretManagerInNM nmTokenSecretManager,
NMStateStoreService stateStore, boolean isDistSchedulerEnabled,
Configuration conf) {
- return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
- dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf);
+ List<ContainerStateTransitionListener> listeners =
+ conf.getInstances(
+ YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
+ ContainerStateTransitionListener.class);
+ NMContext nmContext = new NMContext(containerTokenSecretManager,
+ nmTokenSecretManager, dirsHandler, aclsManager, stateStore,
+ isDistSchedulerEnabled, conf);
+ DefaultContainerStateListener defaultListener =
+ new DefaultContainerStateListener();
+ nmContext.setContainerStateTransitionListener(defaultListener);
+ defaultListener.init(nmContext);
+ for (ContainerStateTransitionListener listener : listeners) {
+ listener.init(nmContext);
+ defaultListener.addListener(listener);
+ }
+ return nmContext;
}
protected void doSecureLogin() throws IOException {
@@ -563,6 +594,8 @@ public class NodeManager extends CompositeService
private NMTimelinePublisher nmTimelinePublisher;
+ private ContainerStateTransitionListener containerStateTransitionListener;
+
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -752,6 +785,17 @@ public class NodeManager extends CompositeService
public void setContainerExecutor(ContainerExecutor executor) {
this.executor = executor;
}
+
+ @Override
+ public ContainerStateTransitionListener
+ getContainerStateTransitionListener() {
+ return this.containerStateTransitionListener;
+ }
+
+ public void setContainerStateTransitionListener(
+ ContainerStateTransitionListener transitionListener) {
+ this.containerStateTransitionListener = transitionListener;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 9b9c47f..df107a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -240,7 +240,8 @@ public class ContainerImpl implements Container {
this.containerRetryContext = configureRetryContext(
conf, launchContext, this.containerId);
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
- stateMachine = stateMachineFactory.make(this);
+ stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
+ context.getContainerStateTransitionListener());
this.context = context;
this.resourceSet = new ResourceSet();
this.resourceMappings = new ResourceMappings();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
index 2d390ac..9279711 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
@@ -25,6 +25,9 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.junit.Assert;
import org.junit.Test;
@@ -57,6 +60,71 @@ public class TestNodeManager {
}
}
+ private static int initCalls = 0;
+ private static int preCalls = 0;
+ private static int postCalls = 0;
+
+ private static class DummyCSTListener1
+ implements ContainerStateTransitionListener {
+ @Override
+ public void init(Context context) {
+ initCalls++;
+ }
+
+ @Override
+ public void preTransition(ContainerImpl op, ContainerState beforeState,
+ ContainerEvent eventToBeProcessed) {
+ preCalls++;
+ }
+
+ @Override
+ public void postTransition(ContainerImpl op, ContainerState beforeState,
+ ContainerState afterState, ContainerEvent processedEvent) {
+ postCalls++;
+ }
+ }
+
+ private static class DummyCSTListener2
+ implements ContainerStateTransitionListener {
+ @Override
+ public void init(Context context) {
+ initCalls++;
+ }
+
+ @Override
+ public void preTransition(ContainerImpl op, ContainerState beforeState,
+ ContainerEvent eventToBeProcessed) {
+ preCalls++;
+ }
+
+ @Override
+ public void postTransition(ContainerImpl op, ContainerState beforeState,
+ ContainerState afterState, ContainerEvent processedEvent) {
+ postCalls++;
+ }
+ }
+
+ @Test
+ public void testListenerInitialization() throws Exception{
+ NodeManager nodeManager = new NodeManager();
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
+ DummyCSTListener1.class.getName() + ","
+ + DummyCSTListener2.class.getName());
+ initCalls = 0;
+ preCalls = 0;
+ postCalls = 0;
+ NodeManager.NMContext nmContext =
+ nodeManager.createNMContext(null, null, null, false, conf);
+ Assert.assertEquals(2, initCalls);
+ nmContext.getContainerStateTransitionListener().preTransition(
+ null, null, null);
+ nmContext.getContainerStateTransitionListener().postTransition(
+ null, null, null, null);
+ Assert.assertEquals(2, preCalls);
+ Assert.assertEquals(2, postCalls);
+ }
+
@Test
public void testCreationOfNodeLabelsProviderService()
throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 7c8551e..0838f1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -765,5 +767,11 @@ public abstract class BaseAMRMProxyTest {
public ContainerExecutor getContainerExecutor() {
return null;
}
+
+ @Override
+ public ContainerStateTransitionListener
+ getContainerStateTransitionListener() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4f9c7c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 8909088..64e6cf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -71,7 +71,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -287,6 +289,29 @@ public class TestContainer {
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(completed + 1, metrics.getCompletedContainers());
assertEquals(running, metrics.getRunningContainers());
+
+ ContainerEventType e1 = wc.initStateToEvent.get(ContainerState.NEW);
+ ContainerState s2 = wc.eventToFinalState.get(e1);
+ ContainerEventType e2 = wc.initStateToEvent.get(s2);
+ ContainerState s3 = wc.eventToFinalState.get(e2);
+ ContainerEventType e3 = wc.initStateToEvent.get(s3);
+ ContainerState s4 = wc.eventToFinalState.get(e3);
+ ContainerEventType e4 = wc.initStateToEvent.get(s4);
+ ContainerState s5 = wc.eventToFinalState.get(e4);
+ ContainerEventType e5 = wc.initStateToEvent.get(s5);
+ ContainerState s6 = wc.eventToFinalState.get(e5);
+
+ Assert.assertEquals(ContainerState.LOCALIZING, s2);
+ Assert.assertEquals(ContainerState.SCHEDULED, s3);
+ Assert.assertEquals(ContainerState.RUNNING, s4);
+ Assert.assertEquals(ContainerState.EXITED_WITH_SUCCESS, s5);
+ Assert.assertEquals(ContainerState.DONE, s6);
+
+ Assert.assertEquals(ContainerEventType.INIT_CONTAINER, e1);
+ Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e2);
+ Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e3);
+ Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e4);
+ Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e5);
}
finally {
if (wc != null) {
@@ -401,6 +426,10 @@ public class TestContainer {
Assert.assertTrue(
containerMetrics.finishTime.value() > containerMetrics.startTime
.value());
+ Assert.assertEquals(ContainerEventType.KILL_CONTAINER,
+ wc.initStateToEvent.get(ContainerState.NEW));
+ Assert.assertEquals(ContainerState.DONE,
+ wc.eventToFinalState.get(ContainerEventType.KILL_CONTAINER));
} finally {
if (wc != null) {
wc.finished();
@@ -942,6 +971,10 @@ public class TestContainer {
final Map<String, LocalResource> localResources;
final Map<String, ByteBuffer> serviceData;
final Context context = mock(Context.class);
+ private final Map<ContainerState, ContainerEventType> initStateToEvent =
+ new HashMap<>();
+ private final Map<ContainerEventType, ContainerState> eventToFinalState =
+ new HashMap<>();
WrappedContainer(int appId, long timestamp, int id, String user)
throws IOException {
@@ -1048,7 +1081,27 @@ public class TestContainer {
}
when(ctxt.getServiceData()).thenReturn(serviceData);
when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
+ ContainerStateTransitionListener listener =
+ new ContainerStateTransitionListener() {
+ @Override
+ public void init(Context cntxt) {}
+
+ @Override
+ public void preTransition(ContainerImpl op, ContainerState beforeState,
+ ContainerEvent eventToBeProcessed) {
+ initStateToEvent.put(beforeState, eventToBeProcessed.getType());
+ }
+ @Override
+ public void postTransition(ContainerImpl op, ContainerState beforeState,
+ ContainerState afterState, ContainerEvent processedEvent) {
+ eventToFinalState.put(processedEvent.getType(), afterState);
+ }
+ };
+ NodeManager.DefaultContainerStateListener multi =
+ new NodeManager.DefaultContainerStateListener();
+ multi.addListener(listener);
+ when(context.getContainerStateTransitionListener()).thenReturn(multi);
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
context);
dispatcher.register(ContainerEventType.class,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org