You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/11/20 06:22:40 UTC
svn commit: r1543709 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: CHANGES.txt
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
Author: bikas
Date: Wed Nov 20 05:22:40 2013
New Revision: 1543709
URL: http://svn.apache.org/r1543709
Log:
Merge r1543707 from trunk to branch-2 for YARN-744. Race condition in ApplicationMasterService.allocate .. It might process same allocate request twice resulting in additional containers getting allocated. (Omkar Vinit Joshi via bikas)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1543709&r1=1543708&r2=1543709&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Nov 20 05:22:40 2013
@@ -138,6 +138,10 @@ Release 2.3.0 - UNRELEASED
YARN-1419. TestFifoScheduler.testAppAttemptMetrics fails intermittently
under jdk7 (Jonathan Eagles via jlowe)
+ YARN-744. Race condition in ApplicationMasterService.allocate .. It might
+ process same allocate request twice resulting in additional containers
+ getting allocated. (Omkar Vinit Joshi via bikas)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1543709&r1=1543708&r2=1543709&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Nov 20 05:22:40 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -97,8 +95,8 @@ public class ApplicationMasterService ex
private Server server;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
- new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
+ private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
+ new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
@@ -217,21 +215,19 @@ public class ApplicationMasterService ex
ApplicationAttemptId applicationAttemptId = authorizeRequest();
ApplicationId appID = applicationAttemptId.getApplicationId();
- AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
- if (lastResponse == null) {
- String message = "Application doesn't exist in cache "
- + applicationAttemptId;
- LOG.error(message);
+ AllocateResponseLock lock = responseMap.get(applicationAttemptId);
+ if (lock == null) {
RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
- AuditConstants.REGISTER_AM, message, "ApplicationMasterService",
+ AuditConstants.REGISTER_AM, "Application doesn't exist in cache "
+ + applicationAttemptId, "ApplicationMasterService",
"Error in registering application master", appID,
applicationAttemptId);
- throw RPCUtil.getRemoteException(message);
+ throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// Allow only one thread in AM to do registerApp at a time.
- synchronized (lastResponse) {
-
+ synchronized (lock) {
+ AllocateResponse lastResponse = lock.getAllocateResponse();
if (hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is already registered : "
@@ -251,7 +247,7 @@ public class ApplicationMasterService ex
// Setting the response id to 0 to identify if the
// application master is register for the respective attemptid
lastResponse.setResponseId(0);
- responseMap.put(applicationAttemptId, lastResponse);
+ lock.setAllocateResponse(lastResponse);
LOG.info("AM registration " + applicationAttemptId);
this.rmContext
.getDispatcher()
@@ -286,17 +282,14 @@ public class ApplicationMasterService ex
ApplicationAttemptId applicationAttemptId = authorizeRequest();
- AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
- if (lastResponse == null) {
- String message = "Application doesn't exist in cache "
- + applicationAttemptId;
- LOG.error(message);
- throw RPCUtil.getRemoteException(message);
+ AllocateResponseLock lock = responseMap.get(applicationAttemptId);
+ if (lock == null) {
+ throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// Allow only one thread in AM to do finishApp at a time.
- synchronized (lastResponse) {
-
+ synchronized (lock) {
+
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
rmContext.getDispatcher().getEventHandler().handle(
@@ -313,6 +306,15 @@ public class ApplicationMasterService ex
}
}
+ private void throwApplicationDoesNotExistInCacheException(
+ ApplicationAttemptId appAttemptId)
+ throws InvalidApplicationMasterRequestException {
+ String message = "Application doesn't exist in cache "
+ + appAttemptId;
+ LOG.error(message);
+ throw new InvalidApplicationMasterRequestException(message);
+ }
+
/**
* @param appAttemptId
* @return true if application is registered for the respective attemptid
@@ -320,10 +322,11 @@ public class ApplicationMasterService ex
public boolean hasApplicationMasterRegistered(
ApplicationAttemptId appAttemptId) {
boolean hasApplicationMasterRegistered = false;
- AllocateResponse lastResponse = responseMap.get(appAttemptId);
+ AllocateResponseLock lastResponse = responseMap.get(appAttemptId);
if (lastResponse != null) {
synchronized (lastResponse) {
- if (lastResponse.getResponseId() >= 0) {
+ if (lastResponse.getAllocateResponse() != null
+ && lastResponse.getAllocateResponse().getResponseId() >= 0) {
hasApplicationMasterRegistered = true;
}
}
@@ -340,38 +343,38 @@ public class ApplicationMasterService ex
this.amLivelinessMonitor.receivedPing(appAttemptId);
/* check if its in cache */
- AllocateResponse lastResponse = responseMap.get(appAttemptId);
- if (lastResponse == null) {
+ AllocateResponseLock lock = responseMap.get(appAttemptId);
+ if (lock == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return resync;
}
-
- if (!hasApplicationMasterRegistered(appAttemptId)) {
- String message =
- "Application Master is trying to allocate before registering for: "
- + appAttemptId.getApplicationId();
- LOG.error(message);
- RMAuditLogger.logFailure(
- this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
- .getUser(), AuditConstants.REGISTER_AM, "",
- "ApplicationMasterService", message, appAttemptId.getApplicationId(),
- appAttemptId);
- throw new InvalidApplicationMasterRequestException(message);
- }
-
- if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
- /* old heartbeat */
- return lastResponse;
- } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
- LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
- // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
- // Reboot is not useful since after AM reboots, it will send register and
- // get an exception. Might as well throw an exception here.
- return resync;
- }
-
- // Allow only one thread in AM to do heartbeat at a time.
- synchronized (lastResponse) {
+ synchronized (lock) {
+ AllocateResponse lastResponse = lock.getAllocateResponse();
+ if (!hasApplicationMasterRegistered(appAttemptId)) {
+ String message =
+ "Application Master is trying to allocate before registering for: "
+ + appAttemptId.getApplicationId();
+ LOG.error(message);
+ RMAuditLogger.logFailure(
+ this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+ .getUser(), AuditConstants.REGISTER_AM, "",
+ "ApplicationMasterService", message,
+ appAttemptId.getApplicationId(),
+ appAttemptId);
+ throw new InvalidApplicationMasterRequestException(message);
+ }
+
+ if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
+ /* old heartbeat */
+ return lastResponse;
+ } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
+ LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
+ // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
+ // Reboot is not useful since after AM reboots, it will send register
+ // and
+ // get an exception. Might as well throw an exception here.
+ return resync;
+ }
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
@@ -380,15 +383,16 @@ public class ApplicationMasterService ex
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
-
- ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
- List<String> blacklistAdditions =
- (blacklistRequest != null) ?
+
+ ResourceBlacklistRequest blacklistRequest =
+ request.getResourceBlacklistRequest();
+ List<String> blacklistAdditions =
+ (blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : null;
- List<String> blacklistRemovals =
- (blacklistRequest != null) ?
+ List<String> blacklistRemovals =
+ (blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : null;
-
+
// sanity check
try {
RMServerUtils.validateResourceRequests(ask,
@@ -443,7 +447,7 @@ public class ApplicationMasterService ex
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(),
rmNode.getLastHealthReportTime());
-
+
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
@@ -454,11 +458,12 @@ public class ApplicationMasterService ex
.pullJustFinishedContainers());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
-
+
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-
+
// add preemption to the allocateResponse message (if any)
- allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+ allocateResponse
+ .setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
@@ -466,21 +471,14 @@ public class ApplicationMasterService ex
.createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
-
- // before returning response, verify in sync
- AllocateResponse oldResponse =
- responseMap.put(appAttemptId, allocateResponse);
- if (oldResponse == null) {
- // appAttempt got unregistered, remove it back out
- responseMap.remove(appAttemptId);
- String message = "App Attempt removed from the cache during allocate"
- + appAttemptId;
- LOG.error(message);
- return resync;
- }
-
+ /*
+ * As we are updating the response inside the lock object so we don't
+ * need to worry about unregister call occurring in between (which
+ * removes the lock object).
+ */
+ lock.setAllocateResponse(allocateResponse);
return allocateResponse;
- }
+ }
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
@@ -542,7 +540,7 @@ public class ApplicationMasterService ex
// attemptID get registered
response.setResponseId(-1);
LOG.info("Registering app attempt : " + attemptId);
- responseMap.put(attemptId, response);
+ responseMap.put(attemptId, new AllocateResponseLock(response));
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
}
@@ -564,4 +562,20 @@ public class ApplicationMasterService ex
}
super.serviceStop();
}
-}
+
+ public static class AllocateResponseLock {
+ private AllocateResponse response;
+
+ public AllocateResponseLock(AllocateResponse response) {
+ this.response = response;
+ }
+
+ public synchronized AllocateResponse getAllocateResponse() {
+ return response;
+ }
+
+ public synchronized void setAllocateResponse(AllocateResponse response) {
+ this.response = response;
+ }
+ }
+}
\ No newline at end of file