You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2013/11/09 02:34:24 UTC
svn commit: r1540239 - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/ha...
Author: szetszwo
Date: Sat Nov 9 01:34:22 2013
New Revision: 1540239
URL: http://svn.apache.org/r1540239
Log:
merge of r1535792 through r1540238 from trunk.
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Sat Nov 9 01:34:22 2013
@@ -88,6 +88,9 @@ Release 2.3.0 - UNRELEASED
YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil
(Karthik Kambatla via Sandy Ryza)
+ YARN-1121. Changed ResourceManager's state-store to drain all events on
+ shut-down. (Jian He via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java Sat Nov 9 01:34:22 2013
@@ -49,6 +49,19 @@ public class AsyncDispatcher extends Abs
private final BlockingQueue<Event> eventQueue;
private volatile boolean stopped = false;
+ // Configuration flag for enabling/disabling draining dispatcher's events on
+ // stop functionality.
+ private volatile boolean drainEventsOnStop = false;
+
+ // Indicates all the remaining dispatcher's events on stop have been drained
+ // and processed.
+ private volatile boolean drained = true;
+
+ // For drainEventsOnStop enabled only, block newly coming events into the
+ // queue while stopping.
+ private volatile boolean blockNewEvents = false;
+ private EventHandler handlerInstance = null;
+
private Thread eventHandlingThread;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
private boolean exitOnDispatchException;
@@ -68,6 +81,7 @@ public class AsyncDispatcher extends Abs
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
+ drained = eventQueue.isEmpty();
Event event;
try {
event = eventQueue.take();
@@ -102,8 +116,19 @@ public class AsyncDispatcher extends Abs
eventHandlingThread.start();
}
+ public void setDrainEventsOnStop() {
+ drainEventsOnStop = true;
+ }
+
@Override
protected void serviceStop() throws Exception {
+ if (drainEventsOnStop) {
+ blockNewEvents = true;
+ LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
+ while(!drained) {
+ Thread.yield();
+ }
+ }
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
@@ -173,11 +198,19 @@ public class AsyncDispatcher extends Abs
@Override
public EventHandler getEventHandler() {
- return new GenericEventHandler();
+ if (handlerInstance == null) {
+ handlerInstance = new GenericEventHandler();
+ }
+ return handlerInstance;
}
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
+ if (blockNewEvents) {
+ return;
+ }
+ drained = false;
+
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Sat Nov 9 01:34:22 2013
@@ -261,17 +261,20 @@ public abstract class RMStateStore exten
}
AsyncDispatcher dispatcher;
-
- public synchronized void serviceInit(Configuration conf) throws Exception{
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception{
// create async handler
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler());
+ dispatcher.setDrainEventsOnStop();
initInternal(conf);
}
-
- protected synchronized void serviceStart() throws Exception {
+
+ @Override
+ protected void serviceStart() throws Exception {
dispatcher.start();
startInternal();
}
@@ -288,11 +291,12 @@ public abstract class RMStateStore exten
*/
protected abstract void startInternal() throws Exception;
- public synchronized void serviceStop() throws Exception {
+ @Override
+ protected void serviceStop() throws Exception {
closeInternal();
dispatcher.stop();
}
-
+
/**
* Derived classes close themselves using this method.
* The base class will be closed and the event dispatcher will be shutdown
@@ -509,8 +513,7 @@ public abstract class RMStateStore exten
}
// Dispatcher related code
-
- private synchronized void handleStoreEvent(RMStateStoreEvent event) {
+ protected void handleStoreEvent(RMStateStoreEvent event) {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
ApplicationState appState = null;
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Sat Nov 9 01:34:22 2013
@@ -163,6 +163,14 @@ public class MockRM extends ResourceMana
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, true);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted) throws Exception {
ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -222,7 +230,9 @@ public class MockRM extends ResourceMana
}.setClientReq(client, req);
fakeUser.doAs(action);
// make sure app is immediately available after submit
- waitForState(appId, RMAppState.ACCEPTED);
+ if (waitForAccepted) {
+ waitForState(appId, RMAppState.ACCEPTED);
+ }
return getRMContext().getRMApps().get(appId);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Sat Nov 9 01:34:22 2013
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -1062,6 +1063,65 @@ public class TestRMRestart {
rm2.stop();
}
+ @Test
+ public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
+ volatile boolean wait = true;
+ @Override
+ public void serviceStop() throws Exception {
+ // Unblock app saving request.
+ wait = false;
+ super.serviceStop();
+ }
+
+ @Override
+ protected void handleStoreEvent(RMStateStoreEvent event) {
+ // Block app saving request.
+ while (wait);
+ super.handleStoreEvent(event);
+ }
+ };
+ memStore.init(conf);
+
+ // start RM
+ final MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+
+ // create apps.
+ final ArrayList<RMApp> appList = new ArrayList<RMApp>();
+ final int NUM_APPS = 5;
+
+ for (int i = 0; i < NUM_APPS; i++) {
+ RMApp app = rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false,
+ "default", -1, null, "MAPREDUCE", false);
+ appList.add(app);
+ rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING);
+ }
+ // all apps's saving request are now enqueued to RMStateStore's dispatcher
+ // queue, and will be processed once rm.stop() is called.
+
+ // Nothing exist in state store before stop is called.
+ Map<ApplicationId, ApplicationState> rmAppState =
+ memStore.getState().getApplicationState();
+ Assert.assertTrue(rmAppState.size() == 0);
+
+ // stop rm
+ rm1.stop();
+
+ // Assert app info is still saved even if stop is called with pending saving
+ // request on dispatcher.
+ for (RMApp app : appList) {
+ ApplicationState appState = rmAppState.get(app.getApplicationId());
+ Assert.assertNotNull(appState);
+ Assert.assertEquals(0, appState.getAttemptCount());
+ Assert.assertEquals(appState.getApplicationSubmissionContext()
+ .getApplicationId(), app.getApplicationSubmissionContext()
+ .getApplicationId());
+ }
+ Assert.assertTrue(rmAppState.size() == NUM_APPS);
+ }
+
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {