You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2014/03/14 19:20:21 UTC

svn commit: r1577649 - in /hadoop/common/branches/branch-2.4/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/t...

Author: jlowe
Date: Fri Mar 14 18:20:21 2014
New Revision: 1577649

URL: http://svn.apache.org/r1577649
Log:
svn merge -c 1577647 FIXES: MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed before registering with RM. Contributed by Rohith

Modified:
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt?rev=1577649&r1=1577648&r2=1577649&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt Fri Mar 14 18:20:21 2014
@@ -70,6 +70,9 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5688. TestStagingCleanup fails intermittently with JDK7 (Mit
     Desai via jeagles)
 
+    MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed
+    before registering with RM (Rohith via jlowe)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1577649&r1=1577648&r2=1577649&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Mar 14 18:20:21 2014
@@ -87,6 +87,7 @@ public abstract class RMCommunicator ext
   // Has a signal (SIGTERM etc) been issued?
   protected volatile boolean isSignalled = false;
   private volatile boolean shouldUnregister = true;
+  private boolean isApplicationMasterRegistered = false;
 
   public RMCommunicator(ClientService clientService, AppContext context) {
     super("RMCommunicator");
@@ -153,6 +154,7 @@ public abstract class RMCommunicator ext
       }
       RegisterApplicationMasterResponse response =
         scheduler.registerApplicationMaster(request);
+      isApplicationMasterRegistered = true;
       maxContainerCapability = response.getMaximumResourceCapability();
       this.context.getClusterInfo().setMaxContainerCapability(
           maxContainerCapability);
@@ -249,7 +251,7 @@ public abstract class RMCommunicator ext
         LOG.warn("InterruptedException while stopping", ie);
       }
     }
-    if(shouldUnregister) {
+    if (isApplicationMasterRegistered && shouldUnregister) {
       unregister();
     }
     super.serviceStop();
@@ -328,4 +330,9 @@ public abstract class RMCommunicator ext
     LOG.info("RMCommunicator notified that iSignalled is: " 
         + isSignalled);
   }
+
+  @VisibleForTesting
+  protected boolean isApplicationMasterRegistered() {
+    return isApplicationMasterRegistered;
+  }
 }

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1577649&r1=1577648&r2=1577649&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Mar 14 18:20:21 2014
@@ -1384,7 +1384,7 @@ public class TestRMContainerAllocator {
     static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents 
     = new ArrayList<JobUpdatedNodesEvent>();
     private MyResourceManager rm;
-
+    private boolean isUnregistered = false;
     private static AppContext createAppContext(
         ApplicationAttemptId appAttemptId, Job job) {
       AppContext context = mock(AppContext.class);
@@ -1474,6 +1474,7 @@ public class TestRMContainerAllocator {
 
     @Override
     protected void unregister() {
+      isUnregistered=true;
     }
 
     @Override
@@ -1523,6 +1524,15 @@ public class TestRMContainerAllocator {
     protected void startAllocatorThread() {
       // override to NOT start thread
     }
+    
+    @Override
+    protected boolean isApplicationMasterRegistered() {
+      return super.isApplicationMasterRegistered();
+    }
+    
+    public boolean isUnregistered() {
+      return isUnregistered;
+    }
         
   }
 
@@ -1770,6 +1780,51 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
   }
   
+  @Test
+  public void testUnregistrationOnlyIfRegistered() throws Exception {
+    Configuration conf = new Configuration();
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher rmDispatcher =
+        (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp rmApp = rm.submitApp(1024);
+    rmDispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264);
+    amNodeManager.nodeHeartbeat(true);
+    rmDispatcher.await();
+
+    final ApplicationAttemptId appAttemptId =
+        rmApp.getCurrentAppAttempt().getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    rmDispatcher.await();
+
+    MRApp mrApp =
+        new MRApp(appAttemptId, ContainerId.newInstance(appAttemptId, 0), 10,
+            0, false, this.getClass().getName(), true, 1) {
+          @Override
+          protected Dispatcher createDispatcher() {
+            return new DrainDispatcher();
+          }
+
+          protected ContainerAllocator createContainerAllocator(
+              ClientService clientService, AppContext context) {
+            return new MyContainerAllocator(rm, appAttemptId, context);
+          };
+        };
+
+    mrApp.submit(conf);
+    DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+    MyContainerAllocator allocator =
+        (MyContainerAllocator) mrApp.getContainerAllocator();
+    amDispatcher.await();
+    Assert.assertTrue(allocator.isApplicationMasterRegistered());
+    mrApp.stop();
+    Assert.assertTrue(allocator.isUnregistered());
+  }
+  
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();