You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/19 03:53:35 UTC

svn commit: r1515256 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/ hadoop-yarn/hadoop-yarn-server/hadoop-ya...

Author: vinodkv
Date: Mon Aug 19 01:53:34 2013
New Revision: 1515256

URL: http://svn.apache.org/r1515256
Log:
YARN-643. Fixed ResourceManager to remove all tokens consistently on app finish. Contributed by Xuan Gong.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1515256&r1=1515255&r2=1515256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Aug 19 01:53:34 2013
@@ -74,6 +74,9 @@ Release 2.1.1-beta - UNRELEASED
     forceKillApplication on non-running and finished applications. (Xuan Gong
     via vinodkv)
 
+    YARN-643. Fixed ResourceManager to remove all tokens consistently on app
+    finish. (Xuan Gong via vinodkv)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1515256&r1=1515255&r2=1515256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Aug 19 01:53:34 2013
@@ -761,6 +761,8 @@ public class RMAppAttemptImpl implements
               rejectedEvent.getApplicationAttemptId().getApplicationId(),
               message)
           );
+
+      appAttempt.removeTokens(appAttempt);
     }
   }
 
@@ -847,7 +849,6 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-
       ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
 
       // Tell the AMS. Unregister from the ApplicationMasterService
@@ -894,9 +895,7 @@ public class RMAppAttemptImpl implements
       appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
         finalAttemptState));
 
-      // Remove the AppAttempt from the AMRMTokenSecretManager
-      appAttempt.rmContext.getAMRMTokenSecretManager()
-        .applicationMasterFinished(appAttemptId);
+      appAttempt.removeTokens(appAttempt);
     }
   }
 
@@ -1015,7 +1014,6 @@ public class RMAppAttemptImpl implements
           " exitCode: " + status.getExitStatus() +
           " due to: " +  status.getDiagnostics() + "." +
           "Failing this attempt.");
-
       // Tell the app, scheduler
       super.transition(appAttempt, finishEvent);
     }
@@ -1042,12 +1040,6 @@ public class RMAppAttemptImpl implements
       appAttempt.rmContext.getAMFinishingMonitor().unregister(
           appAttempt.getAppAttemptId());
 
-      // Unregister from the ClientToAMTokenSecretManager
-      if (UserGroupInformation.isSecurityEnabled()) {
-        appAttempt.rmContext.getClientToAMTokenSecretManager()
-          .unRegisterApplication(appAttempt.getAppAttemptId());
-      }
-
       if(!appAttempt.submissionContext.getUnmanagedAM()) {
         // Tell the launcher to cleanup.
         appAttempt.eventHandler.handle(new AMLauncherEvent(
@@ -1116,10 +1108,6 @@ public class RMAppAttemptImpl implements
 
       appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
 
-      // Remove the AppAttempt from the AMRMTokenSecretManager
-      appAttempt.rmContext.getAMRMTokenSecretManager()
-        .applicationMasterFinished(appAttemptId);
-
       appAttempt.progress = 1.0f;
 
       RMAppAttemptUnregistrationEvent unregisterEvent
@@ -1267,4 +1255,16 @@ public class RMAppAttemptImpl implements
               + " MasterContainer: " + masterContainer);
     store.storeApplicationAttempt(this);
   }
+
+  private void removeTokens(RMAppAttemptImpl appAttempt) {
+    // Unregister from the ClientToAMTokenSecretManager
+    if (UserGroupInformation.isSecurityEnabled()) {
+      appAttempt.rmContext.getClientToAMTokenSecretManager()
+        .unRegisterApplication(appAttempt.getAppAttemptId());
+    }
+
+    // Remove the AppAttempt from the AMRMTokenSecretManager
+    appAttempt.rmContext.getAMRMTokenSecretManager()
+      .applicationMasterFinished(appAttempt.getAppAttemptId());
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1515256&r1=1515255&r2=1515256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Mon Aug 19 01:53:34 2013
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 import java.util.Collections;
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -102,6 +104,11 @@ public class TestRMAppAttemptTransitions
   
   private RMApp application;
   private RMAppAttempt applicationAttempt;
+
+  private Configuration conf = new Configuration();
+  private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
+  private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
+      spy(new ClientToAMTokenSecretManagerInRM());
   
   private final class TestApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
@@ -163,14 +170,13 @@ public class TestRMAppAttemptTransitions
         mock(ContainerAllocationExpirer.class);
     amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     amFinishingMonitor = mock(AMLivelinessMonitor.class);
-    Configuration conf = new Configuration();
     rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
-          null, new AMRMTokenSecretManager(conf),
+          null, amRMTokenManager,
           new RMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInRM(conf),
-          new ClientToAMTokenSecretManagerInRM());
+          clientToAMTokenManager);
     
     RMStateStore store = mock(RMStateStore.class);
     ((RMContextImpl) rmContext).setStateStore(store);
@@ -261,7 +267,11 @@ public class TestRMAppAttemptTransitions
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
-    
+    if (UserGroupInformation.isSecurityEnabled()) {
+      verify(clientToAMTokenManager).registerApplication(
+          applicationAttempt.getAppAttemptId());
+    }
+    assertNotNull(applicationAttempt.getAMRMToken());
     // Check events
     verify(masterService).
         registerAppAttempt(applicationAttempt.getAppAttemptId());
@@ -288,6 +298,7 @@ public class TestRMAppAttemptTransitions
     // this works for unmanaged and managed AM's because this is actually doing
     // verify(application).handle(anyObject());
     verify(application).handle(any(RMAppRejectedEvent.class));
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   /**
@@ -303,6 +314,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
   
   /**
@@ -377,6 +389,8 @@ public class TestRMAppAttemptTransitions
     
     // Check events
     verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
+
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   /**
@@ -422,6 +436,7 @@ public class TestRMAppAttemptTransitions
         applicationAttempt.getTrackingUrl());
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 0);
   }
 
   /**
@@ -442,6 +457,7 @@ public class TestRMAppAttemptTransitions
         .getJustFinishedContainers().size());
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
   
   
@@ -592,6 +608,7 @@ public class TestRMAppAttemptTransitions
             applicationAttempt.getAppAttemptId(), 
             RMAppAttemptEventType.KILL));
     testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   } 
   
   @Test
@@ -666,6 +683,7 @@ public class TestRMAppAttemptTransitions
       applicationAttempt.getAppAttemptId(), cs));
     assertEquals(RMAppAttemptState.FAILED,
       applicationAttempt.getAppAttemptState());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
   
   @Test
@@ -709,6 +727,7 @@ public class TestRMAppAttemptTransitions
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   @Test(timeout=10000)
@@ -725,6 +744,7 @@ public class TestRMAppAttemptTransitions
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   @Test(timeout=20000)
@@ -742,6 +762,7 @@ public class TestRMAppAttemptTransitions
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   @Test 
@@ -848,4 +869,10 @@ public class TestRMAppAttemptTransitions
         diagnostics, 0);
   }
   
+  private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
+    verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      verify(clientToAMTokenManager, times(count)).unRegisterApplication(appAttemptId);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java?rev=1515256&r1=1515255&r2=1515256&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java Mon Aug 19 01:53:34 2013
@@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -46,6 +48,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -80,6 +84,7 @@ public class TestAMRMTokens {
    * 
    * @throws Exception
    */
+  @SuppressWarnings("unchecked")
   @Test
   public void testTokenExpiry() throws Exception {
 
@@ -134,6 +139,20 @@ public class TestAMRMTokens {
       finishAMRequest.setTrackingUrl("url");
       rmClient.finishApplicationMaster(finishAMRequest);
 
+      // Send RMAppAttemptEventType.CONTAINER_FINISHED to transit RMAppAttempt
+      // from Finishing state to Finished State. Both AMRMToken and
+      // ClientToAMToken will be removed.
+      ContainerStatus containerStatus =
+          BuilderUtils.newContainerStatus(attempt.getMasterContainer().getId(),
+              ContainerState.COMPLETE,
+              "AM Container Finished", 0);
+      rm.getRMContext()
+          .getDispatcher()
+          .getEventHandler()
+          .handle(
+              new RMAppAttemptContainerFinishedEvent(applicationAttemptId,
+                  containerStatus));
+
       // Now simulate trying to allocate. RPC call itself should throw auth
       // exception.
       rpc.stopProxy(rmClient, conf); // To avoid using cached client