You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/15 00:02:30 UTC

svn commit: r1409525 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/or...

Author: bobby
Date: Wed Nov 14 23:02:29 2012
New Revision: 1409525

URL: http://svn.apache.org/viewvc?rev=1409525&view=rev
Log:
MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact the RM (jlowe via bobby)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1409525&r1=1409524&r2=1409525&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Nov 14 23:02:29 2012
@@ -663,6 +663,9 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat
     (Jason Lowe via tgraves)
+
+    MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
+    the RM (jlowe via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1409525&r1=1409524&r2=1409525&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Wed Nov 14 23:02:29 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce.v2.app.local;
 
 import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -62,7 +61,6 @@ public class LocalContainerAllocator ext
 
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
-  private AtomicInteger containerCount = new AtomicInteger();
   private long retryInterval;
   private long retrystartTime;
   private String nmHost;
@@ -102,9 +100,9 @@ public class LocalContainerAllocator ext
         this.applicationAttemptId, this.lastResponseID, super
             .getApplicationProgress(), new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
-    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
     AMResponse response;
     try {
+      AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
       response = allocateResponse.getAMResponse();
       // Reset retry count if no exception occurred.
       retrystartTime = System.currentTimeMillis();

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java?rev=1409525&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java Wed Nov 14 23:02:29 2012
@@ -0,0 +1,108 @@
+package org.apache.hadoop.mapreduce.v2.app.local;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLocalContainerAllocator {
+
+  @Test
+  public void testRMConnectionRetry() throws Exception {
+    // verify the connection exception is thrown
+    // if we haven't exhausted the retry interval
+    Configuration conf = new Configuration();
+    LocalContainerAllocator lca = new StubbedLocalContainerAllocator();
+    lca.init(conf);
+    lca.start();
+    try {
+      lca.heartbeat();
+      Assert.fail("heartbeat was supposed to throw");
+    } catch (YarnRemoteException e) {
+      // YarnRemoteException is expected
+    } finally {
+      lca.stop();
+    }
+
+    // verify YarnException is thrown when the retry interval has expired
+    conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
+    lca = new StubbedLocalContainerAllocator();
+    lca.init(conf);
+    lca.start();
+    try {
+      lca.heartbeat();
+      Assert.fail("heartbeat was supposed to throw");
+    } catch (YarnException e) {
+      // YarnException is expected
+    } finally {
+      lca.stop();
+    }
+  }
+
+  private static class StubbedLocalContainerAllocator
+    extends LocalContainerAllocator {
+
+    public StubbedLocalContainerAllocator() {
+      super(mock(ClientService.class), createAppContext(),
+          "nmhost", 1, 2, null);
+    }
+
+    @Override
+    protected void register() {
+    }
+
+    @Override
+    protected void startAllocatorThread() {
+      allocatorThread = new Thread();
+    }
+
+    @Override
+    protected AMRMProtocol createSchedulerProxy() {
+      AMRMProtocol scheduler = mock(AMRMProtocol.class);
+      try {
+        when(scheduler.allocate(isA(AllocateRequest.class)))
+          .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
+      } catch (YarnRemoteException e) {
+      }
+      return scheduler;
+    }
+
+    private static AppContext createAppContext() {
+      ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+      ApplicationAttemptId attemptId =
+          BuilderUtils.newApplicationAttemptId(appId, 1);
+      Job job = mock(Job.class);
+      @SuppressWarnings("rawtypes")
+      EventHandler eventHandler = mock(EventHandler.class);
+      AppContext ctx = mock(AppContext.class);
+      when(ctx.getApplicationID()).thenReturn(appId);
+      when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
+      when(ctx.getJob(isA(JobId.class))).thenReturn(job);
+      when(ctx.getClusterInfo()).thenReturn(
+          new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
+              .newResource(10240)));
+      when(ctx.getEventHandler()).thenReturn(eventHandler);
+      return ctx;
+    }
+  }
+}