You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/14 22:22:32 UTC

svn commit: r1214476 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn...

Author: vinodkv
Date: Wed Dec 14 21:22:32 2011
New Revision: 1214476

URL: http://svn.apache.org/viewvc?rev=1214476&view=rev
Log:
MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the ResourceManager. Contributed by Arun C Murthy.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1214476&r1=1214475&r2=1214476&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Dec 14 21:22:32 2011
@@ -299,6 +299,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode.
     (Siddharth Seth via vinodkv)
 
+    MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
+    ResourceManager. (Arun C Murthy via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1214476&r1=1214475&r2=1214476&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Dec 14 21:22:32 2011
@@ -262,6 +262,16 @@ public class RMNodeImpl implements RMNod
 
   }
 
+  @Private
+  public List<ContainerId> getContainersToCleanUp() {
+    this.readLock.lock();
+    try {
+      return new ArrayList<ContainerId>(containersToClean);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+  
   @Override
   public List<ContainerId> pullContainersToCleanUp() {
 
@@ -342,7 +352,6 @@ public class RMNodeImpl implements RMNod
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-
       rmNode.containersToClean.add(((
           RMNodeCleanContainerEvent) event).getContainerId());
     }
@@ -396,8 +405,17 @@ public class RMNodeImpl implements RMNod
       List<ContainerStatus> completedContainers = 
           new ArrayList<ContainerStatus>();
       for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
-        // Process running containers
         ContainerId containerId = remoteContainer.getContainerId();
+        
+        // Don't bother with containers already scheduled for cleanup,
+        // the scheduler doens't need to know any more about this container
+        if (rmNode.containersToClean.contains(containerId)) {
+          LOG.info("Container " + containerId + " already scheduled for " +
+          		"cleanup, no further processing");
+          continue;
+        }
+        
+        // Process running containers
         if (remoteContainer.getState() == ContainerState.RUNNING) {
           if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
             // Just launched container. RM knows about it the first time.

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1214476&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Wed Dec 14 21:22:32 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.Collections;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestRMNodeTransitions {
+
+  RMNodeImpl node;
+  
+  private RMContext rmContext;
+  private YarnScheduler scheduler;
+
+  private SchedulerEventType eventType;
+  private List<ContainerStatus> completedContainers;
+  
+  private final class TestSchedulerEventDispatcher implements
+  EventHandler<SchedulerEvent> {
+    @Override
+    public void handle(SchedulerEvent event) {
+      scheduler.handle(event);
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    InlineDispatcher rmDispatcher = new InlineDispatcher();
+    
+    rmContext = 
+        new RMContextImpl(new MemStore(), rmDispatcher, null, null, null);
+    scheduler = mock(YarnScheduler.class);
+    doAnswer(
+        new Answer<Void>() {
+
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
+            eventType = event.getType();
+            if (eventType == SchedulerEventType.NODE_UPDATE) {
+              completedContainers = 
+                  ((NodeUpdateSchedulerEvent)event).getCompletedContainers();
+            } else {
+              completedContainers = null;
+            }
+            return null;
+          }
+        }
+        ).when(scheduler).handle(any(SchedulerEvent.class));
+    
+    rmDispatcher.register(SchedulerEventType.class, 
+        new TestSchedulerEventDispatcher());
+    
+    
+    node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null);
+
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+  }
+  
+  private RMNodeStatusEvent getMockRMNodeStatusEvent() {
+    HeartbeatResponse response = mock(HeartbeatResponse.class);
+
+    NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
+    Boolean yes = new Boolean(true);
+    doReturn(yes).when(healthStatus).getIsNodeHealthy();
+    
+    RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
+    doReturn(healthStatus).when(event).getNodeHealthStatus();
+    doReturn(response).when(event).getLatestResponse();
+    doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+    return event;
+  }
+  
+  @Test
+  public void testExpiredContainer() {
+    // Start the node
+    node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+    verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
+    
+    // Expire a container
+		ContainerId completedContainerId = BuilderUtils.newContainerId(
+				BuilderUtils.newApplicationAttemptId(
+						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
+    Assert.assertEquals(1, node.getContainersToCleanUp().size());
+    
+    // Now verify that scheduler isn't notified of an expired container
+    // by checking number of 'completedContainers' it got in the previous event
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+    ContainerStatus containerStatus = mock(ContainerStatus.class);
+    doReturn(completedContainerId).when(containerStatus).getContainerId();
+    doReturn(Collections.singletonList(containerStatus)).
+        when(statusEvent).getContainers();
+    node.handle(statusEvent);
+    Assert.assertEquals(0, completedContainers.size());
+  }
+
+}