You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/12/09 18:38:25 UTC
svn commit: r1549626 [2/2] - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/
hadoop...
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Mon Dec 9 17:38:20 2013
@@ -19,8 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
import java.util.HashMap;
import java.util.List;
@@ -43,6 +47,7 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -99,7 +104,7 @@ public class TestAppManager{
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
- return new RMContextImpl(rmDispatcher,
+ RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null) {
@Override
@@ -107,6 +112,8 @@ public class TestAppManager{
return map;
}
};
+ ((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
+ return context;
}
public class TestAppManagerDispatcher implements
@@ -142,7 +149,6 @@ public class TestAppManager{
public TestRMAppManager(RMContext context, Configuration conf) {
super(context, null, null, new ApplicationACLsManager(conf), conf);
- setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
}
public TestRMAppManager(RMContext context,
@@ -150,7 +156,6 @@ public class TestAppManager{
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
- setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
}
public void checkAppNumCompletedLimit() {
@@ -164,9 +169,8 @@ public class TestAppManager{
public int getCompletedAppsListSize() {
return super.getCompletedAppsListSize();
}
-
- public void setCompletedAppsMax(int max) {
- super.setCompletedAppsMax(max);
+ public int getCompletedAppsInStateStore() {
+ return this.completedAppsInStateStore;
}
public void submitApplication(
ApplicationSubmissionContext submissionContext, String user)
@@ -227,9 +231,9 @@ public class TestAppManager{
// Create such that none of the applications will retire since
// haven't hit max #
RMContext rmContext = mockRMContext(10, now - 10);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
- appMonitor.setCompletedAppsMax(10);
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 10);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext,conf);
Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
10, rmContext.getRMApps().size());
@@ -243,6 +247,8 @@ public class TestAppManager{
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 10,
appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), never()).removeApplication(
+ isA(RMApp.class));
}
@Test
@@ -250,9 +256,10 @@ public class TestAppManager{
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
- appMonitor.setCompletedAppsMax(3);
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 3);
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 3);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
.getRMApps().size());
@@ -266,6 +273,8 @@ public class TestAppManager{
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 3,
appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), times(7)).removeApplication(
+ isA(RMApp.class));
}
@Test
@@ -274,14 +283,17 @@ public class TestAppManager{
// these parameters don't matter, override applications below
RMContext rmContext = mockRMContext(10, now - 20000);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 2);
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
- appMonitor.setCompletedAppsMax(2);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
// clear out applications map
rmContext.getRMApps().clear();
Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
+ // 6 applications are in final state, 4 are not in final state.
// / set with various finished states
RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
rmContext.getRMApps().put(app.getApplicationId(), app);
@@ -318,7 +330,9 @@ public class TestAppManager{
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 2,
appMonitor.getCompletedAppsListSize());
-
+ // 6 applications in final state, 4 of them are removed
+ verify(rmContext.getStateStore(), times(4)).removeApplication(
+ isA(RMApp.class));
}
@Test
@@ -342,14 +356,13 @@ public class TestAppManager{
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 0);
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 0);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
.getRMApps().size());
- // test with 0
- appMonitor.setCompletedAppsMax(0);
-
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", 10,
appMonitor.getCompletedAppsListSize());
@@ -360,6 +373,64 @@ public class TestAppManager{
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 0,
appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), times(10)).removeApplication(
+ isA(RMApp.class));
+ }
+
+ @Test
+ public void testStateStoreAppLimitLessThanMemoryAppLimit() {
+ long now = System.currentTimeMillis();
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ Configuration conf = new YarnConfiguration();
+ int maxAppsInMemory = 8;
+ int maxAppsInStateStore = 4;
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
+ maxAppsInStateStore);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+ addToCompletedApps(appMonitor, rmContext);
+ Assert.assertEquals("Number of completed apps incorrect", 10,
+ appMonitor.getCompletedAppsListSize());
+ appMonitor.checkAppNumCompletedLimit();
+
+ Assert.assertEquals("Number of apps incorrect after # completed check",
+ maxAppsInMemory, rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check",
+ maxAppsInMemory, appMonitor.getCompletedAppsListSize());
+
+ int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
+ verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
+ .removeApplication(isA(RMApp.class));
+ Assert.assertEquals(maxAppsInStateStore,
+ appMonitor.getCompletedAppsInStateStore());
+ }
+
+ @Test
+ public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
+ long now = System.currentTimeMillis();
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ Configuration conf = new YarnConfiguration();
+ int maxAppsInMemory = 8;
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
+ // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+ addToCompletedApps(appMonitor, rmContext);
+ Assert.assertEquals("Number of completed apps incorrect", 10,
+ appMonitor.getCompletedAppsListSize());
+ appMonitor.checkAppNumCompletedLimit();
+
+ int numRemoveApps = 10 - maxAppsInMemory;
+ Assert.assertEquals("Number of apps incorrect after # completed check",
+ maxAppsInMemory, rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check",
+ maxAppsInMemory, appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
+ isA(RMApp.class));
+ Assert.assertEquals(maxAppsInMemory,
+ appMonitor.getCompletedAppsInStateStore());
}
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Mon Dec 9 17:38:20 2013
@@ -30,9 +30,12 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
@@ -106,6 +109,9 @@ public class TestClientRMService {
private static RMDelegationTokenSecretManager dtsm;
+ private final static String QUEUE_1 = "Q-1";
+ private final static String QUEUE_2 = "Q-2";
+
@BeforeClass
public static void setupSecretManager() throws IOException {
RMContext rmContext = mock(RMContext.class);
@@ -438,7 +444,7 @@ public class TestClientRMService {
mockAclsManager, mockQueueACLsManager, null);
// Initialize appnames and queues
- String[] queues = {"Q-1", "Q-2"};
+ String[] queues = {QUEUE_1, QUEUE_2};
String[] appNames =
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
ApplicationId[] appIds =
@@ -596,6 +602,8 @@ public class TestClientRMService {
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
yarnScheduler);
when(rmContext.getRMApps()).thenReturn(apps);
+ when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
+ getSchedulerApps(apps));
}
private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
@@ -614,10 +622,23 @@ public class TestClientRMService {
config, "testqueue"));
return apps;
}
+
+ private List<ApplicationAttemptId> getSchedulerApps(
+ Map<ApplicationId, RMApp> apps) {
+ List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>();
+ // Return app IDs for the apps in testqueue (as defined in getRMApps)
+ schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0));
+ schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0));
+ return schedApps;
+ }
- private ApplicationId getApplicationId(int id) {
+ private static ApplicationId getApplicationId(int id) {
return ApplicationId.newInstance(123456, id);
}
+
+ private static ApplicationAttemptId getApplicationAttemptId(int id) {
+ return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
+ }
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
@@ -641,6 +662,10 @@ public class TestClientRMService {
when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn(
+ Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
+ when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
+ Arrays.asList(getApplicationAttemptId(103)));
return yarnScheduler;
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Dec 9 17:38:20 2013
@@ -34,7 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -46,7 +45,7 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -68,9 +67,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -80,6 +76,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -93,7 +90,6 @@ import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mortbay.log.Log;
public class TestRMRestart {
@@ -106,7 +102,6 @@ public class TestRMRestart {
public void setup() throws UnknownHostException {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
- ExitUtil.disableSystemExit();
conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
@@ -423,6 +418,8 @@ public class TestRMRestart {
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -629,6 +626,8 @@ public class TestRMRestart {
.contains("Failing the application."));
// failed diagnostics from attempt is lost because the diagnostics from
// attempt is not yet available by the time app is saving the app state.
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -675,6 +674,48 @@ public class TestRMRestart {
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
Assert.assertEquals(app0.getDiagnostics().toString(),
appReport.getDiagnostics());
+ rm1.stop();
+ rm2.stop();
+ }
+
+ @Test
+ public void testRMRestartKilledAppWithNoAttempts() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
+ @Override
+ public synchronized void storeApplicationAttemptStateInternal(
+ String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ // ignore attempt saving request.
+ }
+
+ @Override
+ public synchronized void updateApplicationAttemptStateInternal(
+ String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ // ignore attempt saving request.
+ }
+ };
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ // create app
+ RMApp app0 =
+ rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+ null, "MAPREDUCE", false);
+ // kill the app.
+ rm1.killApp(app0.getApplicationId());
+ rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
+
+ // restart rm
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ RMApp loadedApp0 =
+ rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+ rm2.waitForState(loadedApp0.getApplicationId(), RMAppState.KILLED);
+ Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
}
@Test
@@ -724,6 +765,9 @@ public class TestRMRestart {
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
+
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -817,6 +861,9 @@ public class TestRMRestart {
// check application summary is logged for the completed apps after RM restart.
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
isA(ApplicationId.class));
+
+ rm1.stop();
+ rm2.stop();
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@@ -1378,6 +1425,75 @@ public class TestRMRestart {
Assert.assertTrue(rmAppState.size() == NUM_APPS);
}
+ @Test
+ public void testFinishedAppRemovalAfterRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create an app and finish the app.
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ finishApplicationMaster(app0, rm1, nm1, am0);
+
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
+
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+
+ // app0 exits in both state store and rmContext
+ Assert.assertEquals(RMAppState.FINISHED,
+ rmAppState.get(app0.getApplicationId()).getState());
+ rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+
+ // create one more app and finish the app.
+ RMApp app1 = rm2.submitApp(200);
+ MockAM am1 = launchAM(app1, rm2, nm1);
+ finishApplicationMaster(app1, rm2, nm1, am1);
+
+ // the first app0 get kicked out from both rmContext and state store
+ Assert.assertNull(rm2.getRMContext().getRMApps()
+ .get(app0.getApplicationId()));
+ Assert.assertNull(rmAppState.get(app0.getApplicationId()));
+
+ rm1.stop();
+ rm2.stop();
+ }
+
+ // This is to test RM does not get hang on shutdown.
+ @Test (timeout = 10000)
+ public void testRMShutdown() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
+ @Override
+ public synchronized void checkVersion()
+ throws Exception {
+ throw new Exception("Invalid version.");
+ }
+ };
+ // start RM
+ memStore.init(conf);
+ MockRM rm1 = null;
+ try {
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Invalid version."));
+ }
+ Assert.assertTrue(rm1.getServiceState() == STATE.STOPPED);
+ }
+
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Mon Dec 9 17:38:20 2013
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -109,6 +110,7 @@ public class RMStateStoreTestBase extend
boolean isFinalStateValid() throws Exception;
void writeVersion(RMStateVersion version) throws Exception;
RMStateVersion getCurrentVersion() throws Exception;
+ boolean appExists(RMApp app) throws Exception;
}
void waitNotify(TestDispatcher dispatcher) {
@@ -128,7 +130,7 @@ public class RMStateStoreTestBase extend
dispatcher.notified = false;
}
- void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
+ RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
@@ -141,6 +143,7 @@ public class RMStateStoreTestBase extend
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockApp.getUser()).thenReturn("test");
store.storeNewApplication(mockApp);
+ return mockApp;
}
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
@@ -370,6 +373,7 @@ public class RMStateStoreTestBase extend
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
+ store.close();
}
private Token<AMRMTokenIdentifier> generateAMRMToken(
@@ -415,4 +419,43 @@ public class RMStateStoreTestBase extend
Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
}
}
+
+ public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ store.setRMDispatcher(new TestDispatcher());
+ // create and store apps
+ ArrayList<RMApp> appList = new ArrayList<RMApp>();
+ int NUM_APPS = 5;
+ for (int i = 0; i < NUM_APPS; i++) {
+ ApplicationId appId = ApplicationId.newInstance(1383183338, i);
+ RMApp app = storeApp(store, appId, 123456789, 987654321);
+ appList.add(app);
+ }
+
+ Assert.assertEquals(NUM_APPS, appList.size());
+ for (RMApp app : appList) {
+ // wait for app to be stored.
+ while (true) {
+ if (stateStoreHelper.appExists(app)) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ for (RMApp app : appList) {
+ // remove the app
+ store.removeApplication(app);
+ // wait for app to be removed.
+ while (true) {
+ if (!stateStoreHelper.appExists(app)) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Mon Dec 9 17:38:20 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -69,6 +71,13 @@ public class TestFSRMStateStore extends
public RMStateVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
+
+ public Path getAppDir(String appId) {
+ Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME);
+ Path appRootDir = new Path(rootDir, RM_APP_ROOT);
+ Path appDir = new Path(appRootDir, appId);
+ return appDir;
+ }
}
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
@@ -109,9 +118,16 @@ public class TestFSRMStateStore extends
public RMStateVersion getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
+
+ public boolean appExists(RMApp app) throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ Path nodePath =
+ store.getAppDir(app.getApplicationId().toString());
+ return fs.exists(nodePath);
+ }
}
- @Test
+ @Test(timeout = 60000)
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
@@ -126,11 +142,8 @@ public class TestFSRMStateStore extends
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
ApplicationAttemptId attemptId3 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
- Path rootDir =
- new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
- Path appRootDir = new Path(rootDir, "RMAppRoot");
Path appDir =
- new Path(appRootDir, attemptId3.getApplicationId().toString());
+ fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
Path tempAppAttemptFile =
new Path(appDir, attemptId3.toString() + ".tmp");
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
@@ -138,10 +151,11 @@ public class TestFSRMStateStore extends
fsOut.close();
testRMAppStateStore(fsTester);
- Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
+ Assert.assertFalse(fsTester.workingDirPathURI
.getFileSystem(conf).exists(tempAppAttemptFile));
testRMDTSecretManagerStateStore(fsTester);
testCheckVersion(fsTester);
+ testAppDeletion(fsTester);
} finally {
cluster.shutdown();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Mon Dec 9 17:38:20 2013
@@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase {
@@ -57,6 +59,7 @@ public class TestZKRMStateStore extends
ZooKeeper client;
TestZKRMStateStoreInternal store;
+ String workingZnode;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
@@ -79,11 +82,16 @@ public class TestZKRMStateStore extends
public RMStateVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
+
+ public String getAppNode(String appId) {
+ return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
+ + appId;
+ }
}
public RMStateStore getRMStateStore() throws Exception {
- String workingZnode = "/Test";
- Configuration conf = new YarnConfiguration();
+ YarnConfiguration conf = new YarnConfiguration();
+ workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
@@ -107,14 +115,22 @@ public class TestZKRMStateStore extends
public RMStateVersion getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
+
+ public boolean appExists(RMApp app) throws Exception {
+ Stat node =
+ client.exists(store.getAppNode(app.getApplicationId().toString()),
+ false);
+ return node !=null;
+ }
}
- @Test
+ @Test (timeout = 60000)
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
testCheckVersion(zkTester);
+ testAppDeletion(zkTester);
}
private Configuration createHARMConf(
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Mon Dec 9 17:38:20 2013
@@ -120,7 +120,7 @@ public class TestZKRMStateStoreZKClientC
TestZKClient zkClientTester = new TestZKClient();
final String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+ conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000);
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
final ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Mon Dec 9 17:38:20 2013
@@ -651,5 +651,35 @@ public class TestCapacityScheduler {
}
assertFalse(failed.get());
}
+
+ @Test
+ public void testGetAppsInQueue() throws Exception {
+ Application application_0 = new Application("user_0", "a1", resourceManager);
+ application_0.submit();
+
+ Application application_1 = new Application("user_0", "a2", resourceManager);
+ application_1.submit();
+
+ Application application_2 = new Application("user_0", "b2", resourceManager);
+ application_2.submit();
+
+ ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
+ assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
+ assertEquals(2, appsInA.size());
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
+ assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
+ assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
+ assertEquals(3, appsInRoot.size());
+
+ Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Mon Dec 9 17:38:20 2013
@@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -2490,4 +2491,40 @@ public class TestFairScheduler {
assertEquals("Incorrect number of containers allocated", 1, app
.getLiveContainers().size());
}
+
+ @Test
+ public void testGetAppsInQueue() throws Exception {
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ ApplicationAttemptId appAttId1 =
+ createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1");
+ ApplicationAttemptId appAttId2 =
+ createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1");
+ ApplicationAttemptId appAttId3 =
+ createSchedulingRequest(1024, 1, "default", "user1");
+
+ List<ApplicationAttemptId> apps =
+ scheduler.getAppsInQueue("queue1.subqueue1");
+ assertEquals(1, apps.size());
+ assertEquals(appAttId1, apps.get(0));
+ // with and without root prefix should work
+ apps = scheduler.getAppsInQueue("root.queue1.subqueue1");
+ assertEquals(1, apps.size());
+ assertEquals(appAttId1, apps.get(0));
+
+ apps = scheduler.getAppsInQueue("user1");
+ assertEquals(1, apps.size());
+ assertEquals(appAttId3, apps.get(0));
+ // with and without root prefix should work
+ apps = scheduler.getAppsInQueue("root.user1");
+ assertEquals(1, apps.size());
+ assertEquals(appAttId3, apps.get(0));
+
+ // apps in subqueues should be included
+ apps = scheduler.getAppsInQueue("queue1");
+ Assert.assertEquals(2, apps.size());
+ Set<ApplicationAttemptId> appAttIds = Sets.newHashSet(apps.get(0), apps.get(1));
+ assertTrue(appAttIds.contains(appAttId1));
+ assertTrue(appAttIds.contains(appAttId2));
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java Mon Dec 9 17:38:20 2013
@@ -44,7 +44,7 @@ public class TestFairSchedulerEventLog {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
- conf.set("mapred.fairscheduler.eventlog.enabled", "true");
+ conf.set("yarn.scheduler.fair.event-log-enabled", "true");
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Mon Dec 9 17:38:20 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -555,6 +556,24 @@ public class TestFifoScheduler {
Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host));
rm.stop();
}
+
+ @Test
+ public void testGetAppsInQueue() throws Exception {
+ Application application_0 = new Application("user_0", resourceManager);
+ application_0.submit();
+
+ Application application_1 = new Application("user_0", resourceManager);
+ application_1.submit();
+
+ ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+ List<ApplicationAttemptId> appsInDefault = scheduler.getAppsInQueue("default");
+ assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId()));
+ assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId()));
+ assertEquals(2, appsInDefault.size());
+
+ Assert.assertNull(scheduler.getAppsInQueue("someotherqueue"));
+ }
private void checkApplicationResourceUsage(int expected,
Application application) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1549626&r1=1549625&r2=1549626&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Mon Dec 9 17:38:20 2013
@@ -25,18 +25,21 @@ import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -87,7 +90,7 @@ public class MiniYARNCluster extends Com
}
private NodeManager[] nodeManagers;
- private ResourceManager resourceManager;
+ private ResourceManager[] resourceManagers;
private ResourceManagerWrapper resourceManagerWrapper;
@@ -103,12 +106,14 @@ public class MiniYARNCluster extends Com
/**
* @param testName name of the test
- * @param noOfNodeManagers the number of node managers in the cluster
+ * @param numResourceManagers the number of resource managers in the cluster
+ * @param numNodeManagers the number of node managers in the cluster
* @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager
*/
- public MiniYARNCluster(String testName, int noOfNodeManagers,
- int numLocalDirs, int numLogDirs) {
+ public MiniYARNCluster(
+ String testName, int numResourceManagers, int numNodeManagers,
+ int numLocalDirs, int numLogDirs) {
super(testName.replace("$", ""));
this.numLocalDirs = numLocalDirs;
this.numLogDirs = numLogDirs;
@@ -157,28 +162,103 @@ public class MiniYARNCluster extends Com
this.testWorkDir = targetWorkDir;
}
- resourceManagerWrapper = new ResourceManagerWrapper();
- addService(resourceManagerWrapper);
- nodeManagers = new CustomNodeManager[noOfNodeManagers];
- for(int index = 0; index < noOfNodeManagers; index++) {
+ resourceManagers = new ResourceManager[numResourceManagers];
+ for (int i = 0; i < numResourceManagers; i++) {
+ resourceManagers[i] = new ResourceManager();
+ addService(new ResourceManagerWrapper(i));
+ }
+ nodeManagers = new CustomNodeManager[numNodeManagers];
+ for(int index = 0; index < numNodeManagers; index++) {
addService(new NodeManagerWrapper(index));
nodeManagers[index] = new CustomNodeManager();
}
}
-
- @Override
+
+ /**
+ * @param testName name of the test
+ * @param numNodeManagers the number of node managers in the cluster
+ * @param numLocalDirs the number of nm-local-dirs per nodemanager
+ * @param numLogDirs the number of nm-log-dirs per nodemanager
+ */
+ public MiniYARNCluster(String testName, int numNodeManagers,
+ int numLocalDirs, int numLogDirs) {
+ this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
+ }
+
+ @Override
public void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf instanceof YarnConfiguration ? conf
- : new YarnConfiguration(
- conf));
+ if (resourceManagers.length > 1) {
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+
+ StringBuilder rmIds = new StringBuilder();
+ for (int i = 0; i < resourceManagers.length; i++) {
+ if (i != 0) {
+ rmIds.append(",");
+ }
+ rmIds.append("rm" + i);
+ }
+ conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
+ }
+ super.serviceInit(
+ conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
}
public File getTestWorkDir() {
return testWorkDir;
}
+ /**
+ * In a HA cluster, go through all the RMs and find the Active RM. If none
+ * of them are active, wait upto 5 seconds for them to transition to Active.
+ *
+ * In an non-HA cluster, return the index of the only RM.
+ *
+ * @return index of the active RM
+ */
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ int getActiveRMIndex() {
+ if (resourceManagers.length == 1) {
+ return 0;
+ }
+
+ int numRetriesForRMBecomingActive = 5;
+ while (numRetriesForRMBecomingActive-- > 0) {
+ for (int i = 0; i < resourceManagers.length; i++) {
+ try {
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
+ resourceManagers[i].getRMContext().getRMAdminService()
+ .getServiceStatus().getState()) {
+ return i;
+ }
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Couldn't read the status of " +
+ "a ResourceManger in the HA ensemble.", e);
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new YarnRuntimeException("Interrupted while waiting for one " +
+ "of the ResourceManagers to become active");
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * @return the active {@link ResourceManager} of the cluster,
+ * null if none of them are active.
+ */
public ResourceManager getResourceManager() {
- return this.resourceManager;
+ int activeRMIndex = getActiveRMIndex();
+ return activeRMIndex == -1
+ ? null
+ : this.resourceManagers[getActiveRMIndex()];
+ }
+
+ public ResourceManager getResourceManager(int i) {
+ return this.resourceManagers[i];
}
public NodeManager getNodeManager(int i) {
@@ -195,8 +275,29 @@ public class MiniYARNCluster extends Com
}
private class ResourceManagerWrapper extends AbstractService {
- public ResourceManagerWrapper() {
- super(ResourceManagerWrapper.class.getName());
+ private int index;
+
+ public ResourceManagerWrapper(int i) {
+ super(ResourceManagerWrapper.class.getName() + "_" + i);
+ index = i;
+ }
+
+ private void setNonHARMConfiguration(Configuration conf) {
+ String hostname = MiniYARNCluster.getHostname();
+ conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
+ WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
+ }
+
+ private void setHARMConfiguration(Configuration conf) {
+ String rmId = "rm" + index;
+ String hostname = MiniYARNCluster.getHostname();
+ conf.set(YarnConfiguration.RM_HA_ID, rmId);
+ for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
+ conf.set(HAUtil.addSuffix(confKey, rmId), hostname + ":0");
+ }
}
@Override
@@ -206,22 +307,15 @@ public class MiniYARNCluster extends Com
if (!conf.getBoolean(
YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
- // pick free random ports.
- String hostname = MiniYARNCluster.getHostname();
- conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
- WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
+ if (HAUtil.isHAEnabled(conf)) {
+ setHARMConfiguration(conf);
+ } else {
+ setNonHARMConfiguration(conf);
+ }
}
- resourceManager = new ResourceManager() {
- @Override
- protected void doSecureLogin() throws IOException {
- // Don't try to login using keytab in the testcase.
- };
- };
- resourceManager.init(conf);
- resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,
+ resourceManagers[index].init(conf);
+ resourceManagers[index].getRMContext().getDispatcher().register
+ (RMAppAttemptEventType.class,
new EventHandler<RMAppAttemptEvent>() {
public void handle(RMAppAttemptEvent event) {
if (event instanceof RMAppAttemptRegistrationEvent) {
@@ -239,20 +333,20 @@ public class MiniYARNCluster extends Com
try {
new Thread() {
public void run() {
- resourceManager.start();
- };
+ resourceManagers[index].start();
+ }
}.start();
int waitCount = 0;
- while (resourceManager.getServiceState() == STATE.INITED
+ while (resourceManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
- if (resourceManager.getServiceState() != STATE.STARTED) {
+ if (resourceManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException(
"ResourceManager failed to start. Final state is "
- + resourceManager.getServiceState());
+ + resourceManagers[index].getServiceState());
}
super.serviceStart();
} catch (Throwable t) {
@@ -278,9 +372,9 @@ public class MiniYARNCluster extends Com
@Override
protected synchronized void serviceStop() throws Exception {
- if (resourceManager != null) {
+ if (resourceManagers[index] != null) {
waitForAppMastersToFinish(5000);
- resourceManager.stop();
+ resourceManagers[index].stop();
}
super.serviceStop();
@@ -372,7 +466,7 @@ public class MiniYARNCluster extends Com
new Thread() {
public void run() {
nodeManagers[index].start();
- };
+ }
}.start();
int waitCount = 0;
while (nodeManagers[index].getServiceState() == STATE.INITED
@@ -398,12 +492,12 @@ public class MiniYARNCluster extends Com
super.serviceStop();
}
}
-
+
private class CustomNodeManager extends NodeManager {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
- };
+ }
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@@ -412,8 +506,8 @@ public class MiniYARNCluster extends Com
healthChecker, metrics) {
@Override
protected ResourceTracker getRMClient() {
- final ResourceTrackerService rt = resourceManager
- .getResourceTrackerService();
+ final ResourceTrackerService rt =
+ getResourceManager().getResourceTrackerService();
final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -424,8 +518,7 @@ public class MiniYARNCluster extends Com
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnException,
IOException {
- NodeHeartbeatResponse response = recordFactory.newRecordInstance(
- NodeHeartbeatResponse.class);
+ NodeHeartbeatResponse response;
try {
response = rt.nodeHeartbeat(request);
} catch (YarnException e) {
@@ -440,8 +533,7 @@ public class MiniYARNCluster extends Com
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request)
throws YarnException, IOException {
- RegisterNodeManagerResponse response = recordFactory.
- newRecordInstance(RegisterNodeManagerResponse.class);
+ RegisterNodeManagerResponse response;
try {
response = rt.registerNodeManager(request);
} catch (YarnException e) {
@@ -452,13 +544,11 @@ public class MiniYARNCluster extends Com
return response;
}
};
- };
+ }
@Override
- protected void stopRMProxy() {
- return;
- }
+ protected void stopRMProxy() { }
};
- };
+ }
}
}