You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/05/30 15:31:11 UTC

svn commit: r1129166 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/ha...

Author: acmurthy
Date: Mon May 30 13:31:11 2011
New Revision: 1129166

URL: http://svn.apache.org/viewvc?rev=1129166&view=rev
Log:
Fixed NPE in CS by checking Application state before scheduling and fixing synchronization in CS.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 30 13:31:11 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Fixed NPE in CS by checking Application state before scheduling and fixing
+    synchronization in CS. (acmurthy)
+
     Fixing NPE on speculator in MRAppMaster and making job-history optional in
     tests to make test goal succeed. (vinodk and sharadag).
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Mon May 30 13:31:11 2011
@@ -55,10 +55,12 @@ import org.apache.hadoop.yarn.server.res
 @LimitedPrivate("yarn")
 @Evolving
 public class Application {
+  
   private static final Log LOG = LogFactory.getLog(Application.class);
   final ApplicationId applicationId;
   final Queue queue;
   final String user;
+  
   private final RecordFactory recordFactory = RecordFactoryProvider
       .getRecordFactory(null);
 
@@ -114,14 +116,22 @@ public class Application {
     return user;
   }
 
-  public ApplicationState getState() {
+  public synchronized ApplicationState getState() {
     return master.getState();
   }
 
-  public boolean isPending() {
+  public synchronized boolean isPending() {
     return pending;
   }
 
+  public synchronized boolean isSchedulable() {
+    ApplicationState state = getState();
+    return 
+      (state == ApplicationState.ALLOCATED || state == ApplicationState.ALLOCATING ||
+       state == ApplicationState.LAUNCHED || state == ApplicationState.LAUNCHING || 
+       state == ApplicationState.PENDING || state == ApplicationState.RUNNING);
+  }
+  
   public synchronized Map<Priority, Map<String, ResourceRequest>> getRequests() {
     return requests;
   }
@@ -131,12 +141,13 @@ public class Application {
     master.setContainerCount(++i);
     return master.getContainerCount();
   }
-
+  
   /**
    * Clear any pending requests from this application.
    */
   public synchronized void clearRequests() {
     requests.clear();
+    LOG.info("Application " + applicationId + " requests cleared");
   }
 
   /**
@@ -436,7 +447,8 @@ public class Application {
       Map<String, ResourceRequest> requests = getResourceRequests(priority);
       if (requests != null) {
         LOG.debug("showRequests:" + " application=" + applicationId + 
-            " available=" + getResourceLimit() + " current=" + currentConsumption);
+            " available=" + getResourceLimit() + 
+            " current=" + currentConsumption + " state=" + getState());
         for (ResourceRequest request : requests.values()) {
           LOG.debug("showRequests:" + " application=" + applicationId
               + " request=" + request);
@@ -456,7 +468,7 @@ public class Application {
     application.setMasterHost("");
     application.setName("");
     application.setQueue(queue.getQueueName());
-    application.setState(ApplicationState.RUNNING);
+    application.setState(org.apache.hadoop.yarn.api.records.ApplicationState.RUNNING);
     application.setUser(user);
 
     ApplicationStatus status = recordFactory
@@ -513,7 +525,7 @@ public class Application {
     return ((float) requiredResources / clusterNodes);
   }
 
-  synchronized public void finish() {
+  synchronized public void stop() {
     // clear pending resources metrics for the application
     QueueMetrics metrics = queue.getMetrics();
     for (Map<String, ResourceRequest> asks : requests.values()) {
@@ -525,6 +537,9 @@ public class Application {
       }
     }
     metrics.finishApp(this);
+    
+    // Clear requests themselves
+    clearRequests();
   }
 
   public Map<Priority, Set<NodeInfo>> getAllReservations() {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon May 30 13:31:11 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -107,9 +108,10 @@ implements ResourceScheduler, CapacitySc
 
   private Resource minimumAllocation;
 
-  private Map<ApplicationId, Application> applications = 
+  private Map<ApplicationId, Application> applications =
+    Collections.synchronizedMap(
     new TreeMap<ApplicationId, Application>(
-        new BuilderUtils.ApplicationIdComparator());
+        new BuilderUtils.ApplicationIdComparator()));
 
   private boolean initialized = false;
 
@@ -268,22 +270,26 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Override
-  public void addApplication(ApplicationId applicationId, ApplicationMaster master,
+  public synchronized void addApplication(
+      ApplicationId applicationId, ApplicationMaster master,
       String user, String queueName, Priority priority, ApplicationStore appStore)
   throws IOException {
+    // Sanity checks
     Queue queue = queues.get(queueName);
-
     if (queue == null) {
       throw new IOException("Application " + applicationId + 
           " submitted by user " + user + " to unknown queue: " + queueName);
     }
-
     if (!(queue instanceof LeafQueue)) {
       throw new IOException("Application " + applicationId + 
           " submitted by user " + user + " to non-leaf queue: " + queueName);
     }
 
-    Application application = new Application(applicationId, master, queue, user, appStore); 
+    // Create the application
+    Application application = 
+      new Application(applicationId, master, queue, user, appStore);
+    
+    // Submit to the queue
     try {
       queue.submitApplication(application, user, queueName, priority);
     } catch (AccessControlException ace) {
@@ -299,8 +305,12 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Override
-  public void doneApplication(ApplicationId applicationId, boolean finishApplication)
+  public synchronized void doneApplication(
+      ApplicationId applicationId, boolean finishApplication)
   throws IOException {
+    LOG.info("Application " + applicationId + " is done." +
+    		" finish=" + finishApplication);
+    
     Application application = getApplication(applicationId);
 
     if (application == null) {
@@ -310,20 +320,17 @@ implements ResourceScheduler, CapacitySc
       return;
     }
     
-    /*
-     * release all the containes and make sure we clean up the pending 
-     * requests for this application
-     */
+    // Release all the running containers 
     processReleasedContainers(application, application.getCurrentContainers());
-    application.clearRequests();
     
-    /*
-     * release all reserved containers
-     */
+     // Release all reserved containers
     releaseReservedContainers(application);
     
+    // Clean up pending requests, metrics etc.
+    application.stop();
+    
     /** The application can be retried. So only remove it from scheduler data
-     * structures if the finishapplication flag is set.
+     * structures if the finishApplication flag is set.
      */
     if (finishApplication) {
       Queue queue = queues.get(application.getQueue().getQueueName());
@@ -347,29 +354,42 @@ implements ResourceScheduler, CapacitySc
       return new Allocation(EMPTY_CONTAINER_LIST, Resources.none()); 
     }
     
+    // Sanity check
     normalizeRequests(ask);
 
-    LOG.info("DEBUG --- allocate: pre-update" +
-        " applicationId=" + applicationId + 
-        " application=" + application);
-    application.showRequests();
-
-    // Update application requests
-    application.updateResourceRequests(ask);
-
-    // Release ununsed containers and update queue capacities
-    processReleasedContainers(application, release);
-
-    LOG.info("DEBUG --- allocate: post-update");
-    application.showRequests();
-
-    List<Container> allContainers = application.acquire();
-    LOG.info("DEBUG --- allocate:" +
-        " applicationId=" + applicationId + 
-        " #ask=" + ask.size() + 
-        " #release=" + release.size() +
-        " #allContainers=" + allContainers.size());
-    return new Allocation(allContainers, application.getResourceLimit());
+    List<Container> allocatedContainers = null;
+    Resource limit = null;
+    synchronized (application) {
+
+      LOG.info("DEBUG --- allocate: pre-update" +
+          " applicationId=" + applicationId + 
+          " application=" + application);
+      application.showRequests();
+
+      // Update application requests
+      application.updateResourceRequests(ask);
+
+      // Release ununsed containers and update queue capacities
+      processReleasedContainers(application, release);
+
+      LOG.info("DEBUG --- allocate: post-update");
+      application.showRequests();
+
+      // Acquire containers
+      allocatedContainers = application.acquire();
+      
+      // Resource limit
+      limit = application.getResourceLimit();
+      LOG.info("DEBUG --- allocate:" +
+          " applicationId=" + applicationId + 
+          " #ask=" + ask.size() + 
+          " #release=" + release.size() +
+          " #allocatedContainers=" + allocatedContainers.size() +
+          " limit=" + limit);
+      
+    }
+      
+      return new Allocation(allocatedContainers, limit);
   }
 
   @Override
@@ -408,8 +428,8 @@ implements ResourceScheduler, CapacitySc
   }
 
   private void normalizeRequest(ResourceRequest ask) {
-    int memory = ask.getCapability().getMemory();
     int minMemory = minimumAllocation.getMemory();
+    int memory = Math.max(ask.getCapability().getMemory(), minMemory);
     ask.getCapability().setMemory (
         minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
   }
@@ -526,7 +546,7 @@ implements ResourceScheduler, CapacitySc
     }
   }
   
-  private synchronized Application getApplication(ApplicationId applicationId) {
+  private Application getApplication(ApplicationId applicationId) {
     return applications.get(applicationId);
   }
 
@@ -542,7 +562,7 @@ implements ResourceScheduler, CapacitySc
       try {
         doneApplication(event.getAppContext().getApplicationID(), true);
       } catch(IOException ie) {
-        LOG.error("Error in removing application", ie);
+        LOG.error("Error in removing 'done' application", ie);
         //TODO have to be shutdown the RM in case of this.
         // do a graceful shutdown.
       }
@@ -554,7 +574,7 @@ implements ResourceScheduler, CapacitySc
          */
         doneApplication(event.getAppContext().getApplicationID(), false);
       } catch(IOException ie) {
-        LOG.error("Error in removing application", ie);
+        LOG.error("Error in removing 'expired' application", ie);
         //TODO have to be shutdown the RM in case of this.
         // do a graceful shutdown.
       }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon May 30 13:31:11 2011
@@ -451,7 +451,6 @@ public class LeafQueue implements Queue 
         " user: " + user + "," + " leaf-queue: " + getQueueName() +
         " #user-applications: " + user.getApplications() + 
         " #queue-applications: " + getNumApplications());
-
   }
 
   @Override
@@ -462,9 +461,6 @@ public class LeafQueue implements Queue 
       removeApplication(application, getUser(application.getUser()));
     }
 
-    // Clean up metrics etc.
-    application.finish();
-    
     // Inform the parent queue
     parent.finishApplication(application, queue);
   }
@@ -505,10 +501,17 @@ public class LeafQueue implements Queue 
     // Try to assign containers to applications in fifo order
     for (Application application : applications) {
 
+      if (!application.isSchedulable()) {
+        LOG.info("Application " + application.getApplicationId() + 
+            " not schedulable. State = " + application.getState());
+        continue;
+      }
+      
       LOG.info("DEBUG --- pre-assignContainers");
       application.showRequests();
 
       synchronized (application) {
+        //if (application.)
         for (Priority priority : application.getPriorities()) {
 
           // Do we need containers at this 'priority'?

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon May 30 13:31:11 2011
@@ -177,31 +177,41 @@ public class FifoScheduler implements Re
           "or non existant application " + applicationId);
       return new Allocation(EMPTY_CONTAINER_LIST, Resources.none()); 
     }
+
+    // Sanity check
     normalizeRequests(ask);
+    
+    List<Container> allocatedContainers = null;
+    Resource limit = null;
+    synchronized (application) {
+
+      LOG.debug("allocate: pre-update" +
+          " applicationId=" + applicationId + 
+          " application=" + application);
+      application.showRequests();
+
+      // Update application requests
+      application.updateResourceRequests(ask);
+
+      // Release containers
+      releaseContainers(application, release);
 
-    LOG.debug("allocate: pre-update" +
-        " applicationId=" + applicationId + 
-        " application=" + application);
-    application.showRequests();
-
-    // Update application requests
-    application.updateResourceRequests(ask);
-
-    // Release containers
-    releaseContainers(application, release);
-
-    LOG.debug("allocate: post-update" +
-        " applicationId=" + applicationId + 
-        " application=" + application);
-    application.showRequests();
-
-    List<Container> allContainers = application.acquire();
-    LOG.debug("allocate:" +
-        " applicationId=" + applicationId + 
-        " #ask=" + ask.size() + 
-        " #release=" + release.size() +
-        " #allContainers=" + allContainers.size());
-    return new Allocation(allContainers, application.getResourceLimit());
+      LOG.debug("allocate: post-update" +
+          " applicationId=" + applicationId + 
+          " application=" + application);
+      application.showRequests();
+
+      allocatedContainers = application.acquire();
+      limit = application.getResourceLimit();
+      LOG.debug("allocate:" +
+          " applicationId=" + applicationId + 
+          " #ask=" + ask.size() + 
+          " #release=" + release.size() +
+          " #allocatedContainers=" + allocatedContainers.size() + 
+          " limit=" + limit);
+    }
+    
+    return new Allocation(allocatedContainers, limit);
   }
 
   private void releaseContainers(Application application, List<Container> release) {
@@ -251,11 +261,11 @@ public class FifoScheduler implements Re
 
     // Release current containers
     releaseContainers(application, application.getCurrentContainers());
-    application.clearRequests();
-    // Update metrics
+    
+    // Clean up pending requests, metrics etc.
+    application.stop();
+    
     if (finishApplication) {
-      metrics.finishApp(application);
-      application.finish();
       // Let the cluster know that the applications are done
       finishedApplication(applicationId, 
           application.getAllNodesForApplication());