You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/11/20 06:06:16 UTC

svn commit: r1543707 - in /hadoop/common/trunk/hadoop-yarn-project: CHANGES.txt hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

Author: bikas
Date: Wed Nov 20 05:06:15 2013
New Revision: 1543707

URL: http://svn.apache.org/r1543707
Log:
YARN-744. Race condition in ApplicationMasterService.allocate .. It might process same allocate request twice resulting in additional containers getting allocated. (Omkar Vinit Joshi via bikas)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1543707&r1=1543706&r2=1543707&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Nov 20 05:06:15 2013
@@ -156,6 +156,10 @@ Release 2.3.0 - UNRELEASED
     YARN-1419. TestFifoScheduler.testAppAttemptMetrics fails intermittently
     under jdk7 (Jonathan Eagles via jlowe)
 
+    YARN-744. Race condition in ApplicationMasterService.allocate .. It might
+    process same allocate request twice resulting in additional containers
+    getting allocated. (Omkar Vinit Joshi via bikas)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1543707&r1=1543706&r2=1543707&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Nov 20 05:06:15 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -97,8 +95,8 @@ public class ApplicationMasterService ex
   private Server server;
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
-  private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
-      new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
+  private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
+      new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
   private final AllocateResponse resync =
       recordFactory.newRecordInstance(AllocateResponse.class);
   private final RMContext rmContext;
@@ -217,21 +215,19 @@ public class ApplicationMasterService ex
     ApplicationAttemptId applicationAttemptId = authorizeRequest();
 
     ApplicationId appID = applicationAttemptId.getApplicationId();
-    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
-    if (lastResponse == null) {
-      String message = "Application doesn't exist in cache "
-          + applicationAttemptId;
-      LOG.error(message);
+    AllocateResponseLock lock = responseMap.get(applicationAttemptId);
+    if (lock == null) {
       RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
-          AuditConstants.REGISTER_AM, message, "ApplicationMasterService",
+          AuditConstants.REGISTER_AM, "Application doesn't exist in cache "
+              + applicationAttemptId, "ApplicationMasterService",
           "Error in registering application master", appID,
           applicationAttemptId);
-      throw RPCUtil.getRemoteException(message);
+      throwApplicationDoesNotExistInCacheException(applicationAttemptId);
     }
 
     // Allow only one thread in AM to do registerApp at a time.
-    synchronized (lastResponse) {
-
+    synchronized (lock) {
+      AllocateResponse lastResponse = lock.getAllocateResponse();
       if (hasApplicationMasterRegistered(applicationAttemptId)) {
         String message =
             "Application Master is already registered : "
@@ -251,7 +247,7 @@ public class ApplicationMasterService ex
       // Setting the response id to 0 to identify if the
       // application master is register for the respective attemptid
       lastResponse.setResponseId(0);
-      responseMap.put(applicationAttemptId, lastResponse);
+      lock.setAllocateResponse(lastResponse);
       LOG.info("AM registration " + applicationAttemptId);
       this.rmContext
         .getDispatcher()
@@ -286,17 +282,14 @@ public class ApplicationMasterService ex
 
     ApplicationAttemptId applicationAttemptId = authorizeRequest();
 
-    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
-    if (lastResponse == null) {
-      String message = "Application doesn't exist in cache "
-          + applicationAttemptId;
-      LOG.error(message);
-      throw RPCUtil.getRemoteException(message);
+    AllocateResponseLock lock = responseMap.get(applicationAttemptId);
+    if (lock == null) {
+      throwApplicationDoesNotExistInCacheException(applicationAttemptId);
     }
 
     // Allow only one thread in AM to do finishApp at a time.
-    synchronized (lastResponse) {
-
+    synchronized (lock) {
+      
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
       rmContext.getDispatcher().getEventHandler().handle(
@@ -313,6 +306,15 @@ public class ApplicationMasterService ex
     }
   }
 
+  private void throwApplicationDoesNotExistInCacheException(
+      ApplicationAttemptId appAttemptId)
+      throws InvalidApplicationMasterRequestException {
+    String message = "Application doesn't exist in cache "
+        + appAttemptId;
+    LOG.error(message);
+    throw new InvalidApplicationMasterRequestException(message);
+  }
+  
   /**
    * @param appAttemptId
    * @return true if application is registered for the respective attemptid
@@ -320,10 +322,11 @@ public class ApplicationMasterService ex
   public boolean hasApplicationMasterRegistered(
       ApplicationAttemptId appAttemptId) {
     boolean hasApplicationMasterRegistered = false;
-    AllocateResponse lastResponse = responseMap.get(appAttemptId);
+    AllocateResponseLock lastResponse = responseMap.get(appAttemptId);
     if (lastResponse != null) {
       synchronized (lastResponse) {
-        if (lastResponse.getResponseId() >= 0) {
+        if (lastResponse.getAllocateResponse() != null
+            && lastResponse.getAllocateResponse().getResponseId() >= 0) {
           hasApplicationMasterRegistered = true;
         }
       }
@@ -340,38 +343,38 @@ public class ApplicationMasterService ex
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
     /* check if its in cache */
-    AllocateResponse lastResponse = responseMap.get(appAttemptId);
-    if (lastResponse == null) {
+    AllocateResponseLock lock = responseMap.get(appAttemptId);
+    if (lock == null) {
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
       return resync;
     }
-    
-    if (!hasApplicationMasterRegistered(appAttemptId)) {
-      String message =
-          "Application Master is trying to allocate before registering for: "
-              + appAttemptId.getApplicationId();
-      LOG.error(message);
-      RMAuditLogger.logFailure(
-        this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
-          .getUser(), AuditConstants.REGISTER_AM, "",
-        "ApplicationMasterService", message, appAttemptId.getApplicationId(),
-        appAttemptId);
-      throw new InvalidApplicationMasterRequestException(message);
-    }
-
-    if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
-      /* old heartbeat */
-      return lastResponse;
-    } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
-      LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
-      // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
-      // Reboot is not useful since after AM reboots, it will send register and 
-      // get an exception. Might as well throw an exception here.
-      return resync;
-    } 
-    
-    // Allow only one thread in AM to do heartbeat at a time.
-    synchronized (lastResponse) {
+    synchronized (lock) {
+      AllocateResponse lastResponse = lock.getAllocateResponse();
+      if (!hasApplicationMasterRegistered(appAttemptId)) {
+        String message =
+            "Application Master is trying to allocate before registering for: "
+                + appAttemptId.getApplicationId();
+        LOG.error(message);
+        RMAuditLogger.logFailure(
+            this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+                .getUser(), AuditConstants.REGISTER_AM, "",
+            "ApplicationMasterService", message,
+            appAttemptId.getApplicationId(),
+            appAttemptId);
+        throw new InvalidApplicationMasterRequestException(message);
+      }
+
+      if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
+        /* old heartbeat */
+        return lastResponse;
+      } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
+        LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
+        // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
+        // Reboot is not useful since after AM reboots, it will send register
+        // and
+        // get an exception. Might as well throw an exception here.
+        return resync;
+      }
 
       // Send the status update to the appAttempt.
       this.rmContext.getDispatcher().getEventHandler().handle(
@@ -380,15 +383,16 @@ public class ApplicationMasterService ex
 
       List<ResourceRequest> ask = request.getAskList();
       List<ContainerId> release = request.getReleaseList();
-      
-      ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
-      List<String> blacklistAdditions = 
-          (blacklistRequest != null) ? 
+
+      ResourceBlacklistRequest blacklistRequest =
+          request.getResourceBlacklistRequest();
+      List<String> blacklistAdditions =
+          (blacklistRequest != null) ?
               blacklistRequest.getBlacklistAdditions() : null;
-      List<String> blacklistRemovals = 
-          (blacklistRequest != null) ? 
+      List<String> blacklistRemovals =
+          (blacklistRequest != null) ?
               blacklistRequest.getBlacklistRemovals() : null;
-      
+
       // sanity check
       try {
         RMServerUtils.validateResourceRequests(ask,
@@ -443,7 +447,7 @@ public class ApplicationMasterService ex
               rmNode.getTotalCapability(), numContainers,
               rmNode.getHealthReport(),
               rmNode.getLastHealthReportTime());
-          
+
           updatedNodeReports.add(report);
         }
         allocateResponse.setUpdatedNodes(updatedNodeReports);
@@ -454,11 +458,12 @@ public class ApplicationMasterService ex
           .pullJustFinishedContainers());
       allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
       allocateResponse.setAvailableResources(allocation.getResourceLimit());
-      
+
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-   
+
       // add preemption to the allocateResponse message (if any)
-      allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+      allocateResponse
+          .setPreemptionMessage(generatePreemptionMessage(allocation));
 
       // Adding NMTokens for allocated containers.
       if (!allocation.getContainers().isEmpty()) {
@@ -466,21 +471,14 @@ public class ApplicationMasterService ex
             .createAndGetNMTokens(app.getUser(), appAttemptId,
                 allocation.getContainers()));
       }
-
-      // before returning response, verify in sync
-      AllocateResponse oldResponse =
-          responseMap.put(appAttemptId, allocateResponse);
-      if (oldResponse == null) {
-        // appAttempt got unregistered, remove it back out
-        responseMap.remove(appAttemptId);
-        String message = "App Attempt removed from the cache during allocate"
-            + appAttemptId;
-        LOG.error(message);
-        return resync;
-      }
-
+      /*
+       * As we are updating the response inside the lock object so we don't
+       * need to worry about unregister call occurring in between (which
+       * removes the lock object).
+       */
+      lock.setAllocateResponse(allocateResponse);
       return allocateResponse;
-    }
+    }    
   }
   
   private PreemptionMessage generatePreemptionMessage(Allocation allocation){
@@ -542,7 +540,7 @@ public class ApplicationMasterService ex
     // attemptID get registered
     response.setResponseId(-1);
     LOG.info("Registering app attempt : " + attemptId);
-    responseMap.put(attemptId, response);
+    responseMap.put(attemptId, new AllocateResponseLock(response));
     rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
   }
 
@@ -564,4 +562,20 @@ public class ApplicationMasterService ex
     }
     super.serviceStop();
   }
-}
+  
+  public static class AllocateResponseLock {
+    private AllocateResponse response;
+    
+    public AllocateResponseLock(AllocateResponse response) {
+      this.response = response;
+    }
+    
+    public synchronized AllocateResponse getAllocateResponse() {
+      return response;
+    }
+    
+    public synchronized void setAllocateResponse(AllocateResponse response) {
+      this.response = response;
+    }
+  }
+}
\ No newline at end of file