You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/05/30 15:31:11 UTC
svn commit: r1129166 - in /hadoop/mapreduce/branches/MR-279: ./
yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/ha...
Author: acmurthy
Date: Mon May 30 13:31:11 2011
New Revision: 1129166
URL: http://svn.apache.org/viewvc?rev=1129166&view=rev
Log:
Fixed NPE in CS by checking Application state before scheduling and fixing synchronization in CS.
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 30 13:31:11 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ Fixed NPE in CS by checking Application state before scheduling and fixing
+ synchronization in CS. (acmurthy)
+
Fixing NPE on speculator in MRAppMaster and making job-history optional in
tests to make test goal succeed. (vinodk and sharadag).
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Mon May 30 13:31:11 2011
@@ -55,10 +55,12 @@ import org.apache.hadoop.yarn.server.res
@LimitedPrivate("yarn")
@Evolving
public class Application {
+
private static final Log LOG = LogFactory.getLog(Application.class);
final ApplicationId applicationId;
final Queue queue;
final String user;
+
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
@@ -114,14 +116,22 @@ public class Application {
return user;
}
- public ApplicationState getState() {
+ public synchronized ApplicationState getState() {
return master.getState();
}
- public boolean isPending() {
+ public synchronized boolean isPending() {
return pending;
}
+ public synchronized boolean isSchedulable() {
+ ApplicationState state = getState();
+ return
+ (state == ApplicationState.ALLOCATED || state == ApplicationState.ALLOCATING ||
+ state == ApplicationState.LAUNCHED || state == ApplicationState.LAUNCHING ||
+ state == ApplicationState.PENDING || state == ApplicationState.RUNNING);
+ }
+
public synchronized Map<Priority, Map<String, ResourceRequest>> getRequests() {
return requests;
}
@@ -131,12 +141,13 @@ public class Application {
master.setContainerCount(++i);
return master.getContainerCount();
}
-
+
/**
* Clear any pending requests from this application.
*/
public synchronized void clearRequests() {
requests.clear();
+ LOG.info("Application " + applicationId + " requests cleared");
}
/**
@@ -436,7 +447,8 @@ public class Application {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + applicationId +
- " available=" + getResourceLimit() + " current=" + currentConsumption);
+ " available=" + getResourceLimit() +
+ " current=" + currentConsumption + " state=" + getState());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + applicationId
+ " request=" + request);
@@ -456,7 +468,7 @@ public class Application {
application.setMasterHost("");
application.setName("");
application.setQueue(queue.getQueueName());
- application.setState(ApplicationState.RUNNING);
+ application.setState(org.apache.hadoop.yarn.api.records.ApplicationState.RUNNING);
application.setUser(user);
ApplicationStatus status = recordFactory
@@ -513,7 +525,7 @@ public class Application {
return ((float) requiredResources / clusterNodes);
}
- synchronized public void finish() {
+ synchronized public void stop() {
// clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics();
for (Map<String, ResourceRequest> asks : requests.values()) {
@@ -525,6 +537,9 @@ public class Application {
}
}
metrics.finishApp(this);
+
+ // Clear requests themselves
+ clearRequests();
}
public Map<Priority, Set<NodeInfo>> getAllReservations() {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon May 30 13:31:11 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -107,9 +108,10 @@ implements ResourceScheduler, CapacitySc
private Resource minimumAllocation;
- private Map<ApplicationId, Application> applications =
+ private Map<ApplicationId, Application> applications =
+ Collections.synchronizedMap(
new TreeMap<ApplicationId, Application>(
- new BuilderUtils.ApplicationIdComparator());
+ new BuilderUtils.ApplicationIdComparator()));
private boolean initialized = false;
@@ -268,22 +270,26 @@ implements ResourceScheduler, CapacitySc
}
@Override
- public void addApplication(ApplicationId applicationId, ApplicationMaster master,
+ public synchronized void addApplication(
+ ApplicationId applicationId, ApplicationMaster master,
String user, String queueName, Priority priority, ApplicationStore appStore)
throws IOException {
+ // Sanity checks
Queue queue = queues.get(queueName);
-
if (queue == null) {
throw new IOException("Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName);
}
-
if (!(queue instanceof LeafQueue)) {
throw new IOException("Application " + applicationId +
" submitted by user " + user + " to non-leaf queue: " + queueName);
}
- Application application = new Application(applicationId, master, queue, user, appStore);
+ // Create the application
+ Application application =
+ new Application(applicationId, master, queue, user, appStore);
+
+ // Submit to the queue
try {
queue.submitApplication(application, user, queueName, priority);
} catch (AccessControlException ace) {
@@ -299,8 +305,12 @@ implements ResourceScheduler, CapacitySc
}
@Override
- public void doneApplication(ApplicationId applicationId, boolean finishApplication)
+ public synchronized void doneApplication(
+ ApplicationId applicationId, boolean finishApplication)
throws IOException {
+ LOG.info("Application " + applicationId + " is done." +
+ " finish=" + finishApplication);
+
Application application = getApplication(applicationId);
if (application == null) {
@@ -310,20 +320,17 @@ implements ResourceScheduler, CapacitySc
return;
}
- /*
- * release all the containes and make sure we clean up the pending
- * requests for this application
- */
+ // Release all the running containers
processReleasedContainers(application, application.getCurrentContainers());
- application.clearRequests();
- /*
- * release all reserved containers
- */
+ // Release all reserved containers
releaseReservedContainers(application);
+ // Clean up pending requests, metrics etc.
+ application.stop();
+
/** The application can be retried. So only remove it from scheduler data
- * structures if the finishapplication flag is set.
+ * structures if the finishApplication flag is set.
*/
if (finishApplication) {
Queue queue = queues.get(application.getQueue().getQueueName());
@@ -347,29 +354,42 @@ implements ResourceScheduler, CapacitySc
return new Allocation(EMPTY_CONTAINER_LIST, Resources.none());
}
+ // Sanity check
normalizeRequests(ask);
- LOG.info("DEBUG --- allocate: pre-update" +
- " applicationId=" + applicationId +
- " application=" + application);
- application.showRequests();
-
- // Update application requests
- application.updateResourceRequests(ask);
-
- // Release ununsed containers and update queue capacities
- processReleasedContainers(application, release);
-
- LOG.info("DEBUG --- allocate: post-update");
- application.showRequests();
-
- List<Container> allContainers = application.acquire();
- LOG.info("DEBUG --- allocate:" +
- " applicationId=" + applicationId +
- " #ask=" + ask.size() +
- " #release=" + release.size() +
- " #allContainers=" + allContainers.size());
- return new Allocation(allContainers, application.getResourceLimit());
+ List<Container> allocatedContainers = null;
+ Resource limit = null;
+ synchronized (application) {
+
+ LOG.info("DEBUG --- allocate: pre-update" +
+ " applicationId=" + applicationId +
+ " application=" + application);
+ application.showRequests();
+
+ // Update application requests
+ application.updateResourceRequests(ask);
+
+ // Release ununsed containers and update queue capacities
+ processReleasedContainers(application, release);
+
+ LOG.info("DEBUG --- allocate: post-update");
+ application.showRequests();
+
+ // Acquire containers
+ allocatedContainers = application.acquire();
+
+ // Resource limit
+ limit = application.getResourceLimit();
+ LOG.info("DEBUG --- allocate:" +
+ " applicationId=" + applicationId +
+ " #ask=" + ask.size() +
+ " #release=" + release.size() +
+ " #allocatedContainers=" + allocatedContainers.size() +
+ " limit=" + limit);
+
+ }
+
+ return new Allocation(allocatedContainers, limit);
}
@Override
@@ -408,8 +428,8 @@ implements ResourceScheduler, CapacitySc
}
private void normalizeRequest(ResourceRequest ask) {
- int memory = ask.getCapability().getMemory();
int minMemory = minimumAllocation.getMemory();
+ int memory = Math.max(ask.getCapability().getMemory(), minMemory);
ask.getCapability().setMemory (
minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
}
@@ -526,7 +546,7 @@ implements ResourceScheduler, CapacitySc
}
}
- private synchronized Application getApplication(ApplicationId applicationId) {
+ private Application getApplication(ApplicationId applicationId) {
return applications.get(applicationId);
}
@@ -542,7 +562,7 @@ implements ResourceScheduler, CapacitySc
try {
doneApplication(event.getAppContext().getApplicationID(), true);
} catch(IOException ie) {
- LOG.error("Error in removing application", ie);
+ LOG.error("Error in removing 'done' application", ie);
//TODO have to be shutdown the RM in case of this.
// do a graceful shutdown.
}
@@ -554,7 +574,7 @@ implements ResourceScheduler, CapacitySc
*/
doneApplication(event.getAppContext().getApplicationID(), false);
} catch(IOException ie) {
- LOG.error("Error in removing application", ie);
+ LOG.error("Error in removing 'expired' application", ie);
//TODO have to be shutdown the RM in case of this.
// do a graceful shutdown.
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon May 30 13:31:11 2011
@@ -451,7 +451,6 @@ public class LeafQueue implements Queue
" user: " + user + "," + " leaf-queue: " + getQueueName() +
" #user-applications: " + user.getApplications() +
" #queue-applications: " + getNumApplications());
-
}
@Override
@@ -462,9 +461,6 @@ public class LeafQueue implements Queue
removeApplication(application, getUser(application.getUser()));
}
- // Clean up metrics etc.
- application.finish();
-
// Inform the parent queue
parent.finishApplication(application, queue);
}
@@ -505,10 +501,17 @@ public class LeafQueue implements Queue
// Try to assign containers to applications in fifo order
for (Application application : applications) {
+ if (!application.isSchedulable()) {
+ LOG.info("Application " + application.getApplicationId() +
+ " not schedulable. State = " + application.getState());
+ continue;
+ }
+
LOG.info("DEBUG --- pre-assignContainers");
application.showRequests();
synchronized (application) {
+ //if (application.)
for (Priority priority : application.getPriorities()) {
// Do we need containers at this 'priority'?
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1129166&r1=1129165&r2=1129166&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon May 30 13:31:11 2011
@@ -177,31 +177,41 @@ public class FifoScheduler implements Re
"or non existant application " + applicationId);
return new Allocation(EMPTY_CONTAINER_LIST, Resources.none());
}
+
+ // Sanity check
normalizeRequests(ask);
+
+ List<Container> allocatedContainers = null;
+ Resource limit = null;
+ synchronized (application) {
+
+ LOG.debug("allocate: pre-update" +
+ " applicationId=" + applicationId +
+ " application=" + application);
+ application.showRequests();
+
+ // Update application requests
+ application.updateResourceRequests(ask);
+
+ // Release containers
+ releaseContainers(application, release);
- LOG.debug("allocate: pre-update" +
- " applicationId=" + applicationId +
- " application=" + application);
- application.showRequests();
-
- // Update application requests
- application.updateResourceRequests(ask);
-
- // Release containers
- releaseContainers(application, release);
-
- LOG.debug("allocate: post-update" +
- " applicationId=" + applicationId +
- " application=" + application);
- application.showRequests();
-
- List<Container> allContainers = application.acquire();
- LOG.debug("allocate:" +
- " applicationId=" + applicationId +
- " #ask=" + ask.size() +
- " #release=" + release.size() +
- " #allContainers=" + allContainers.size());
- return new Allocation(allContainers, application.getResourceLimit());
+ LOG.debug("allocate: post-update" +
+ " applicationId=" + applicationId +
+ " application=" + application);
+ application.showRequests();
+
+ allocatedContainers = application.acquire();
+ limit = application.getResourceLimit();
+ LOG.debug("allocate:" +
+ " applicationId=" + applicationId +
+ " #ask=" + ask.size() +
+ " #release=" + release.size() +
+ " #allocatedContainers=" + allocatedContainers.size() +
+ " limit=" + limit);
+ }
+
+ return new Allocation(allocatedContainers, limit);
}
private void releaseContainers(Application application, List<Container> release) {
@@ -251,11 +261,11 @@ public class FifoScheduler implements Re
// Release current containers
releaseContainers(application, application.getCurrentContainers());
- application.clearRequests();
- // Update metrics
+
+ // Clean up pending requests, metrics etc.
+ application.stop();
+
if (finishApplication) {
- metrics.finishApp(application);
- application.finish();
// Let the cluster know that the applications are done
finishedApplication(applicationId,
application.getAllNodesForApplication());