You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/14 22:22:32 UTC
svn commit: r1214476 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn...
Author: vinodkv
Date: Wed Dec 14 21:22:32 2011
New Revision: 1214476
URL: http://svn.apache.org/viewvc?rev=1214476&view=rev
Log:
MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the ResourceManager. Contributed by Arun C Murthy.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1214476&r1=1214475&r2=1214476&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Dec 14 21:22:32 2011
@@ -299,6 +299,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode.
(Siddharth Seth via vinodkv)
+ MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
+ ResourceManager. (Arun C Murthy via vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1214476&r1=1214475&r2=1214476&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Dec 14 21:22:32 2011
@@ -262,6 +262,16 @@ public class RMNodeImpl implements RMNod
}
+ @Private
+ public List<ContainerId> getContainersToCleanUp() {
+ this.readLock.lock();
+ try {
+ return new ArrayList<ContainerId>(containersToClean);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
@Override
public List<ContainerId> pullContainersToCleanUp() {
@@ -342,7 +352,6 @@ public class RMNodeImpl implements RMNod
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-
rmNode.containersToClean.add(((
RMNodeCleanContainerEvent) event).getContainerId());
}
@@ -396,8 +405,17 @@ public class RMNodeImpl implements RMNod
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
- // Process running containers
ContainerId containerId = remoteContainer.getContainerId();
+
+ // Don't bother with containers already scheduled for cleanup,
+ // the scheduler doens't need to know any more about this container
+ if (rmNode.containersToClean.contains(containerId)) {
+ LOG.info("Container " + containerId + " already scheduled for " +
+ "cleanup, no further processing");
+ continue;
+ }
+
+ // Process running containers
if (remoteContainer.getState() == ContainerState.RUNNING) {
if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
// Just launched container. RM knows about it the first time.
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1214476&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Wed Dec 14 21:22:32 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.Collections;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestRMNodeTransitions {
+
+ RMNodeImpl node;
+
+ private RMContext rmContext;
+ private YarnScheduler scheduler;
+
+ private SchedulerEventType eventType;
+ private List<ContainerStatus> completedContainers;
+
+ private final class TestSchedulerEventDispatcher implements
+ EventHandler<SchedulerEvent> {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ InlineDispatcher rmDispatcher = new InlineDispatcher();
+
+ rmContext =
+ new RMContextImpl(new MemStore(), rmDispatcher, null, null, null);
+ scheduler = mock(YarnScheduler.class);
+ doAnswer(
+ new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
+ eventType = event.getType();
+ if (eventType == SchedulerEventType.NODE_UPDATE) {
+ completedContainers =
+ ((NodeUpdateSchedulerEvent)event).getCompletedContainers();
+ } else {
+ completedContainers = null;
+ }
+ return null;
+ }
+ }
+ ).when(scheduler).handle(any(SchedulerEvent.class));
+
+ rmDispatcher.register(SchedulerEventType.class,
+ new TestSchedulerEventDispatcher());
+
+
+ node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ private RMNodeStatusEvent getMockRMNodeStatusEvent() {
+ HeartbeatResponse response = mock(HeartbeatResponse.class);
+
+ NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
+ Boolean yes = new Boolean(true);
+ doReturn(yes).when(healthStatus).getIsNodeHealthy();
+
+ RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
+ doReturn(healthStatus).when(event).getNodeHealthStatus();
+ doReturn(response).when(event).getLatestResponse();
+ doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+ return event;
+ }
+
+ @Test
+ public void testExpiredContainer() {
+ // Start the node
+ node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+ verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
+
+ // Expire a container
+ ContainerId completedContainerId = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(0, 0), 0), 0);
+ node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
+ Assert.assertEquals(1, node.getContainersToCleanUp().size());
+
+ // Now verify that scheduler isn't notified of an expired container
+ // by checking number of 'completedContainers' it got in the previous event
+ RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+ ContainerStatus containerStatus = mock(ContainerStatus.class);
+ doReturn(completedContainerId).when(containerStatus).getContainerId();
+ doReturn(Collections.singletonList(containerStatus)).
+ when(statusEvent).getContainers();
+ node.handle(statusEvent);
+ Assert.assertEquals(0, completedContainers.size());
+ }
+
+}