You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [19/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Tue Aug 19 23:49:39 2014
@@ -18,26 +18,34 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -45,13 +53,29 @@ import org.apache.hadoop.yarn.server.uti
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class TestApplicationCleanup {
private static final Log LOG = LogFactory
.getLog(TestApplicationCleanup.class);
+
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() throws UnknownHostException {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ conf = new YarnConfiguration();
+ UserGroupInformation.setConfiguration(conf);
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
+ }
+ @SuppressWarnings("resource")
@Test
public void testAppCleanup() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@@ -130,6 +154,7 @@ public class TestApplicationCleanup {
rm.stop();
}
+ @SuppressWarnings("resource")
@Test
public void testContainerCleanup() throws Exception {
@@ -207,20 +232,7 @@ public class TestApplicationCleanup {
containerStatuses.put(app.getApplicationId(), containerStatusList);
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
- dispatcher.await();
- List<ContainerId> contsToClean = resp.getContainersToCleanup();
- int cleanedConts = contsToClean.size();
- waitCount = 0;
- while (cleanedConts < 1 && waitCount++ < 200) {
- LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
- Thread.sleep(100);
- resp = nm1.nodeHeartbeat(true);
- dispatcher.await();
- contsToClean = resp.getContainersToCleanup();
- cleanedConts += contsToClean.size();
- }
- LOG.info("Got cleanup for " + contsToClean.get(0));
- Assert.assertEquals(1, cleanedConts);
+ waitForContainerCleanup(dispatcher, nm1, resp);
// Now to test the case when RM already gave cleanup, and NM suddenly
// realizes that the container is running.
@@ -233,24 +245,237 @@ public class TestApplicationCleanup {
containerStatuses.put(app.getApplicationId(), containerStatusList);
resp = nm1.nodeHeartbeat(containerStatuses, true);
- dispatcher.await();
- contsToClean = resp.getContainersToCleanup();
- cleanedConts = contsToClean.size();
// The cleanup list won't be instantaneous as it is given out by scheduler
// and not RMNodeImpl.
- waitCount = 0;
- while (cleanedConts < 1 && waitCount++ < 200) {
- LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
- Thread.sleep(100);
- resp = nm1.nodeHeartbeat(true);
+ waitForContainerCleanup(dispatcher, nm1, resp);
+
+ rm.stop();
+ }
+
+ protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+ NodeHeartbeatResponse resp) throws Exception {
+ int waitCount = 0, cleanedConts = 0;
+ List<ContainerId> contsToClean;
+ do {
dispatcher.await();
contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size();
+ if (cleanedConts >= 1) {
+ break;
+ }
+ Thread.sleep(100);
+ resp = nm.nodeHeartbeat(true);
+ } while(waitCount++ < 200);
+
+ if (contsToClean.isEmpty()) {
+ LOG.error("Failed to get any containers to cleanup");
+ } else {
+ LOG.info("Got cleanup for " + contsToClean.get(0));
}
- LOG.info("Got cleanup for " + contsToClean.get(0));
Assert.assertEquals(1, cleanedConts);
+ }
- rm.stop();
+ private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
+ throws Exception {
+ while (true) {
+ NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
+ if (response.getApplicationsToCleanup() != null
+ && response.getApplicationsToCleanup().size() == 1
+ && appId.equals(response.getApplicationsToCleanup().get(0))) {
+ return;
+ }
+
+ LOG.info("Haven't got application=" + appId.toString()
+ + " in cleanup list from node heartbeat response, "
+ + "sleep for a while before next heartbeat");
+ Thread.sleep(1000);
+ }
+ }
+
+ private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ return am;
+ }
+
+ @SuppressWarnings("resource")
+ @Test (timeout = 60000)
+ public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+ // start new RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // nm1 register to rm2, and do a heartbeat
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+ rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+ // wait for application cleanup message received
+ waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+ rm1.stop();
+ rm2.stop();
+ }
+
+ @SuppressWarnings("resource")
+ @Test(timeout = 60000)
+ public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ MockNM nm2 =
+ new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService());
+ nm2.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+
+ // alloc another container on nm2
+ AllocateResponse allocResponse =
+ am0.allocate(Arrays.asList(ResourceRequest.newInstance(
+ Priority.newInstance(1), "*", Resource.newInstance(1024, 0), 1)),
+ null);
+ while (null == allocResponse.getAllocatedContainers()
+ || allocResponse.getAllocatedContainers().isEmpty()) {
+ nm2.nodeHeartbeat(true);
+ allocResponse = am0.allocate(null, null);
+ Thread.sleep(1000);
+ }
+
+ // start new RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // nm1/nm2 register to rm2, and do a heartbeat
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance(
+ ContainerId.newInstance(am0.getApplicationAttemptId(), 1),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0,
+ Priority.newInstance(0), 1234)), Arrays.asList(app0.getApplicationId()));
+ nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm2.registerNode(Arrays.asList(app0.getApplicationId()));
+
+ // assert app state has been saved.
+ rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+ // wait for application cleanup message received on NM1
+ waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+ // wait for application cleanup message received on NM2
+ waitForAppCleanupMessageRecved(nm2, app0.getApplicationId());
+
+ rm1.stop();
+ rm2.stop();
+ }
+
+ @SuppressWarnings("resource")
+ @Test (timeout = 60000)
+ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+ Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ MockRM rm1 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+ // start new RM
+ final DrainDispatcher dispatcher2 = new DrainDispatcher();
+ MockRM rm2 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher2;
+ }
+ };
+ rm2.start();
+
+ // nm1 register to rm2, and do a heartbeat
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+ rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Add unknown container for application unknown to scheduler
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+ .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+ waitForContainerCleanup(dispatcher2, nm1, response);
+
+ rm1.stop();
+ rm2.stop();
+ }
+
+ @Test (timeout = 60000)
+ public void testAppCleanupWhenNMReconnects() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+ // wait for application cleanup message received
+ waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+ // reconnect NM with application still active
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+ waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+ rm1.stop();
}
public static void main(String[] args) throws Exception {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Tue Aug 19 23:49:39 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -194,28 +195,17 @@ public class TestApplicationMasterLaunch
// request for containers
int request = 2;
- try {
- AllocateResponse ar =
- am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
- } catch (Exception e) {
- Assert.assertEquals("Application Master is trying to allocate before "
- + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
- e.getMessage());
- thrown = true;
- }
+ AllocateResponse ar =
+ am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
+ Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
// kick the scheduler
nm1.nodeHeartbeat(true);
- try {
- AllocateResponse amrs =
- am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>());
- } catch (Exception e) {
- Assert.assertEquals("Application Master is trying to allocate before "
- + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
- e.getMessage());
- thrown = true;
- }
- Assert.assertTrue(thrown);
+ AllocateResponse amrs =
+ am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
am.registerAppAttempt();
thrown = false;
try {
@@ -228,5 +218,17 @@ public class TestApplicationMasterLaunch
thrown = true;
}
Assert.assertTrue(thrown);
+
+ // Simulate an AM that was disconnected and app attempt was removed
+ // (responseMap does not contain attemptid)
+ am.unregisterAppAttempt();
+ nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
+ ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+
+ AllocateResponse amrs2 =
+ am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Tue Aug 19 23:49:39 2014
@@ -18,60 +18,33 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import com.google.common.collect.Maps;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
import static java.lang.Thread.sleep;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Mockito.*;
public class TestApplicationMasterService {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -198,7 +171,6 @@ public class TestApplicationMasterServic
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
- am1.setAMRMProtocol(rm.getApplicationMasterService());
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();
@@ -270,13 +242,17 @@ public class TestApplicationMasterServic
}
Assert.assertNotNull(cause);
Assert
- .assertTrue(cause instanceof InvalidApplicationMasterRequestException);
+ .assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
Assert.assertNotNull(cause.getMessage());
Assert
.assertTrue(cause
.getMessage()
.contains(
"Application Master is trying to unregister before registering for:"));
+
+ am1.registerAppAttempt();
+
+ am1.unregisterAppAttempt(req, false);
} finally {
if (rm != null) {
rm.stop();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Tue Aug 19 23:49:39 2014
@@ -44,16 +44,17 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.CyclicBarrier;
import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -137,6 +139,10 @@ public class TestClientRMService {
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
+ private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT";
+ static {
+ KerberosName.setRules(kerberosRule);
+ }
@BeforeClass
public static void setupSecretManager() throws IOException {
@@ -259,6 +265,28 @@ public class TestClientRMService {
}
@Test
+ public void testGetApplicationResourceUsageReportDummy() throws YarnException,
+ IOException {
+ ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+ YarnScheduler yarnScheduler = mockYarnScheduler();
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(yarnScheduler, rmContext);
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+ new EventHandler<Event>() {
+ public void handle(Event event) {
+ }
+ });
+ ApplicationSubmissionContext asContext =
+ mock(ApplicationSubmissionContext.class);
+ YarnConfiguration config = new YarnConfiguration();
+ RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
+ rmContext, yarnScheduler, null, asContext, config, false);
+ ApplicationResourceUsageReport report = rmAppAttemptImpl
+ .getApplicationResourceUsageReport();
+ assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
+ }
+
+ @Test
public void testGetApplicationAttempts() throws YarnException, IOException {
ClientRMService rmService = createRMService();
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -456,6 +484,17 @@ public class TestClientRMService {
UserGroupInformation.createRemoteUser("owner");
private static final UserGroupInformation other =
UserGroupInformation.createRemoteUser("other");
+ private static final UserGroupInformation tester =
+ UserGroupInformation.createRemoteUser("tester");
+ private static final String testerPrincipal = "tester@EXAMPLE.COM";
+ private static final String ownerPrincipal = "owner@EXAMPLE.COM";
+ private static final String otherPrincipal = "other@EXAMPLE.COM";
+ private static final UserGroupInformation testerKerb =
+ UserGroupInformation.createRemoteUser(testerPrincipal);
+ private static final UserGroupInformation ownerKerb =
+ UserGroupInformation.createRemoteUser(ownerPrincipal);
+ private static final UserGroupInformation otherKerb =
+ UserGroupInformation.createRemoteUser(otherPrincipal);
@Test
public void testTokenRenewalByOwner() throws Exception {
@@ -478,9 +517,8 @@ public class TestClientRMService {
checkTokenRenewal(owner, other);
return null;
} catch (YarnException ex) {
- Assert.assertTrue(ex.getMessage().contains(
- "Client " + owner.getUserName() +
- " tries to renew a token with renewer specified as " +
+ Assert.assertTrue(ex.getMessage().contains(owner.getUserName() +
+ " tries to renew a token with renewer " +
other.getUserName()));
throw ex;
}
@@ -524,6 +562,147 @@ public class TestClientRMService {
rmService.renewDelegationToken(request);
}
+ @Test
+ public void testTokenCancellationByOwner() throws Exception {
+ // two tests required - one with a kerberos name
+ // and with a short name
+ RMContext rmContext = mock(RMContext.class);
+ final ClientRMService rmService =
+ new ClientRMService(rmContext, null, null, null, null, dtsm);
+ testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ checkTokenCancellation(rmService, testerKerb, other);
+ return null;
+ }
+ });
+ owner.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ checkTokenCancellation(owner, other);
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testTokenCancellationByRenewer() throws Exception {
+ // two tests required - one with a kerberos name
+ // and with a short name
+ RMContext rmContext = mock(RMContext.class);
+ final ClientRMService rmService =
+ new ClientRMService(rmContext, null, null, null, null, dtsm);
+ testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ checkTokenCancellation(rmService, owner, testerKerb);
+ return null;
+ }
+ });
+ other.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ checkTokenCancellation(owner, other);
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testTokenCancellationByWrongUser() {
+ // two sets to test -
+ // 1. try to cancel tokens of short and kerberos users as a kerberos UGI
+ // 2. try to cancel tokens of short and kerberos users as a simple auth UGI
+
+ RMContext rmContext = mock(RMContext.class);
+ final ClientRMService rmService =
+ new ClientRMService(rmContext, null, null, null, null, dtsm);
+ UserGroupInformation[] kerbTestOwners =
+ { owner, other, tester, ownerKerb, otherKerb };
+ UserGroupInformation[] kerbTestRenewers =
+ { owner, other, ownerKerb, otherKerb };
+ for (final UserGroupInformation tokOwner : kerbTestOwners) {
+ for (final UserGroupInformation tokRenewer : kerbTestRenewers) {
+ try {
+ testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ checkTokenCancellation(rmService, tokOwner, tokRenewer);
+ Assert.fail("We should not reach here; token owner = "
+ + tokOwner.getUserName() + ", renewer = "
+ + tokRenewer.getUserName());
+ return null;
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ testerKerb.getUserName()
+ + " is not authorized to cancel the token"));
+ return null;
+ }
+ }
+ });
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception; " + e.getMessage());
+ }
+ }
+ }
+
+ UserGroupInformation[] simpleTestOwners =
+ { owner, other, ownerKerb, otherKerb, testerKerb };
+ UserGroupInformation[] simpleTestRenewers =
+ { owner, other, ownerKerb, otherKerb };
+ for (final UserGroupInformation tokOwner : simpleTestOwners) {
+ for (final UserGroupInformation tokRenewer : simpleTestRenewers) {
+ try {
+ tester.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ checkTokenCancellation(tokOwner, tokRenewer);
+ Assert.fail("We should not reach here; token owner = "
+ + tokOwner.getUserName() + ", renewer = "
+ + tokRenewer.getUserName());
+ return null;
+ } catch (YarnException ex) {
+ Assert.assertTrue(ex.getMessage().contains(
+ tester.getUserName()
+ + " is not authorized to cancel the token"));
+ return null;
+ }
+ }
+ });
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception; " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ private void checkTokenCancellation(UserGroupInformation owner,
+ UserGroupInformation renewer) throws IOException, YarnException {
+ RMContext rmContext = mock(RMContext.class);
+ final ClientRMService rmService =
+ new ClientRMService(rmContext, null, null, null, null, dtsm);
+ checkTokenCancellation(rmService, owner, renewer);
+ }
+
+ private void checkTokenCancellation(ClientRMService rmService,
+ UserGroupInformation owner, UserGroupInformation renewer)
+ throws IOException, YarnException {
+ RMDelegationTokenIdentifier tokenIdentifier =
+ new RMDelegationTokenIdentifier(new Text(owner.getUserName()),
+ new Text(renewer.getUserName()), null);
+ Token<?> token =
+ new Token<RMDelegationTokenIdentifier>(tokenIdentifier, dtsm);
+ org.apache.hadoop.yarn.api.records.Token dToken =
+ BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+ .toString(), token.getPassword(), token.getService().toString());
+ CancelDelegationTokenRequest request =
+ Records.newRecord(CancelDelegationTokenRequest.class);
+ request.setDelegationToken(dToken);
+ rmService.cancelDelegationToken(request);
+ }
+
@Test (timeout = 30000)
@SuppressWarnings ("rawtypes")
public void testAppSubmit() throws Exception {
@@ -647,7 +826,8 @@ public class TestClientRMService {
ApplicationId[] appIds =
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
-
+
+ long[] submitTimeMillis = new long[3];
// Submit applications
for (int i = 0; i < appIds.length; i++) {
ApplicationId appId = appIds[i];
@@ -657,6 +837,7 @@ public class TestClientRMService {
appId, appNames[i], queues[i % queues.length],
new HashSet<String>(tags.subList(0, i + 1)));
rmService.submitApplication(submitRequest);
+ submitTimeMillis[i] = System.currentTimeMillis();
}
// Test different cases of ClientRMService#getApplications()
@@ -668,6 +849,24 @@ public class TestClientRMService {
request.setLimit(1L);
assertEquals("Failed to limit applications", 1,
rmService.getApplications(request).getApplicationList().size());
+
+ // Check start range
+ request = GetApplicationsRequest.newInstance();
+ request.setStartRange(submitTimeMillis[0], System.currentTimeMillis());
+
+ // 2 applications are submitted after first timeMills
+ assertEquals("Incorrect number of matching start range",
+ 2, rmService.getApplications(request).getApplicationList().size());
+
+ // 1 application is submitted after the second timeMills
+ request.setStartRange(submitTimeMillis[1], System.currentTimeMillis());
+ assertEquals("Incorrect number of matching start range",
+ 1, rmService.getApplications(request).getApplicationList().size());
+
+ // no application is submitted after the third timeMills
+ request.setStartRange(submitTimeMillis[2], System.currentTimeMillis());
+ assertEquals("Incorrect number of matching start range",
+ 0, rmService.getApplications(request).getApplicationList().size());
// Check queue
request = GetApplicationsRequest.newInstance();
@@ -945,6 +1144,8 @@ public class TestClientRMService {
Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
Arrays.asList(getApplicationAttemptId(103)));
+ ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+ when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
return yarnScheduler;
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Tue Aug 19 23:49:39 2014
@@ -36,6 +36,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -235,8 +236,8 @@ public class TestClientRMTokens {
@Test
public void testShortCircuitRenewCancel()
throws IOException, InterruptedException {
- InetSocketAddress addr =
- new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ InetSocketAddress addr = NetUtils.createSocketAddr(
+ InetAddress.getLocalHost().getHostName(), 123, null);
checkShortCircuitRenewCancel(addr, addr, true);
}
@@ -244,17 +245,19 @@ public class TestClientRMTokens {
public void testShortCircuitRenewCancelWildcardAddress()
throws IOException, InterruptedException {
InetSocketAddress rmAddr = new InetSocketAddress(123);
+ InetSocketAddress serviceAddr = NetUtils.createSocketAddr(
+ InetAddress.getLocalHost().getHostName(), rmAddr.getPort(), null);
checkShortCircuitRenewCancel(
rmAddr,
- new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+ serviceAddr,
true);
}
@Test
public void testShortCircuitRenewCancelSameHostDifferentPort()
throws IOException, InterruptedException {
- InetSocketAddress rmAddr =
- new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ InetSocketAddress rmAddr = NetUtils.createSocketAddr(
+ InetAddress.getLocalHost().getHostName(), 123, null);
checkShortCircuitRenewCancel(
rmAddr,
new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
@@ -264,8 +267,8 @@ public class TestClientRMTokens {
@Test
public void testShortCircuitRenewCancelDifferentHostSamePort()
throws IOException, InterruptedException {
- InetSocketAddress rmAddr =
- new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ InetSocketAddress rmAddr = NetUtils.createSocketAddr(
+ InetAddress.getLocalHost().getHostName(), 123, null);
checkShortCircuitRenewCancel(
rmAddr,
new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
@@ -275,8 +278,8 @@ public class TestClientRMTokens {
@Test
public void testShortCircuitRenewCancelDifferentHostDifferentPort()
throws IOException, InterruptedException {
- InetSocketAddress rmAddr =
- new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ InetSocketAddress rmAddr = NetUtils.createSocketAddr(
+ InetAddress.getLocalHost().getHostName(), 123, null);
checkShortCircuitRenewCancel(
rmAddr,
new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Tue Aug 19 23:49:39 2014
@@ -18,16 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -52,21 +53,21 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
private final int GB = 1024;
private static YarnConfiguration conf;
-
+
@BeforeClass
public static void setup() {
conf = new YarnConfiguration();
@@ -76,12 +77,12 @@ public class TestFifoScheduler {
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
- ResourceScheduler scheduler = new FifoScheduler();
+ FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
- scheduler.reinitialize(conf, null);
+ scheduler.serviceInit(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@@ -213,6 +214,35 @@ public class TestFifoScheduler {
rm.stop();
}
+ @Test
+ public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
+ FifoScheduler scheduler = new FifoScheduler();
+ MockRM rm = new MockRM(conf);
+ scheduler.setRMContext(rm.getRMContext());
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, rm.getRMContext());
+
+ RMNode node = MockNodes.newNodeInfo(1,
+ Resources.createResource(1024, 4), 1, "127.0.0.1");
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ scheduler.addApplication(appId, "queue1", "user1", false);
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ try {
+ scheduler.handle(updateEvent);
+ } catch (NullPointerException e) {
+ Assert.fail();
+ }
+
+ ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
+ scheduler.addApplicationAttempt(attId, false, false);
+
+ rm.stop();
+ }
+
private void testMinimumAllocation(YarnConfiguration conf, int testAlloc)
throws Exception {
MockRM rm = new MockRM(conf);
@@ -266,7 +296,12 @@ public class TestFifoScheduler {
conf.setQueues("default", new String[] {"default"});
conf.setCapacity("default", 100);
FifoScheduler fs = new FifoScheduler();
+ fs.init(conf);
+ fs.start();
+ // mock rmContext to avoid NPE.
+ RMContext context = mock(RMContext.class);
fs.reinitialize(conf, null);
+ fs.setRMContext(context);
RMNode n1 =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
@@ -286,6 +321,7 @@ public class TestFifoScheduler {
fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+ fs.stop();
}
@Test (timeout = 50000)
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java Tue Aug 19 23:49:39 2014
@@ -43,10 +43,11 @@ import org.junit.Test;
public class TestMoveApplication {
private ResourceManager resourceManager = null;
private static boolean failMove;
-
+ private Configuration conf;
+
@Before
public void setUp() throws Exception {
- Configuration conf = new YarnConfiguration();
+ conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
FifoSchedulerWithMove.class);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public class TestMoveApplication {
}
}
- @Test (timeout = 5000)
- public void testMoveSuccessful() throws Exception {
- // Submit application
- Application application = new Application("user1", resourceManager);
- ApplicationId appId = application.getApplicationId();
- application.submit();
-
- // Wait for app to be accepted
- RMApp app = resourceManager.rmContext.getRMApps().get(appId);
- while (app.getState() != RMAppState.ACCEPTED) {
- Thread.sleep(100);
- }
-
- ClientRMService clientRMService = resourceManager.getClientRMService();
+ @Test (timeout = 10000)
+ public
+ void testMoveSuccessful() throws Exception {
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ RMApp app = rm1.submitApp(1024);
+ ClientRMService clientRMService = rm1.getClientRMService();
// FIFO scheduler does not support moves
- clientRMService.moveApplicationAcrossQueues(
- MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-
- RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+ clientRMService
+ .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+ .newInstance(app.getApplicationId(), "newqueue"));
+
+ RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
assertEquals("newqueue", rmApp.getQueue());
+ rm1.stop();
}
-
+
@Test
public void testMoveRejectedByPermissions() throws Exception {
failMove = true;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.junit.After;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -78,7 +81,14 @@ public class TestRM {
// Milliseconds to sleep for when waiting for something to happen
private final static int WAIT_SLEEP_MS = 100;
-
+
+ @After
+ public void tearDown() {
+ ClusterMetrics.destroy();
+ QueueMetrics.clearQueueMetrics();
+ DefaultMetricsSystem.shutdown();
+ }
+
@Test
public void testGetNewAppId() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java Tue Aug 19 23:49:39 2014
@@ -348,14 +348,14 @@ public class TestRMAdminService {
rm.adminService.refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
- Assert.assertTrue(ProxyUsers.getProxyGroups()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
.get("hadoop.proxyuser.test.groups").size() == 1);
- Assert.assertTrue(ProxyUsers.getProxyGroups()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
.get("hadoop.proxyuser.test.groups").contains("test_groups"));
- Assert.assertTrue(ProxyUsers.getProxyHosts()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
.get("hadoop.proxyuser.test.hosts").size() == 1);
- Assert.assertTrue(ProxyUsers.getProxyHosts()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
.get("hadoop.proxyuser.test.hosts").contains("test_hosts"));
}
@@ -708,14 +708,14 @@ public class TestRMAdminService {
aclsString);
// verify ProxyUsers and ProxyHosts
- Assert.assertTrue(ProxyUsers.getProxyGroups()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
.get("hadoop.proxyuser.test.groups").size() == 1);
- Assert.assertTrue(ProxyUsers.getProxyGroups()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
.get("hadoop.proxyuser.test.groups").contains("test_groups"));
- Assert.assertTrue(ProxyUsers.getProxyHosts()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
.get("hadoop.proxyuser.test.hosts").size() == 1);
- Assert.assertTrue(ProxyUsers.getProxyHosts()
+ Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
.get("hadoop.proxyuser.test.hosts").contains("test_hosts"));
// verify UserToGroupsMappings
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Tue Aug 19 23:49:39 2014
@@ -28,7 +28,7 @@ import java.net.InetSocketAddress;
import javax.ws.rs.core.MediaType;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +37,8 @@ import org.apache.hadoop.ha.HAServicePro
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
@@ -92,6 +94,9 @@ public class TestRMHA {
// Enable webapp to test web-services also
configuration.setBoolean(MockRM.ENABLE_WEBAPP, true);
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+ ClusterMetrics.destroy();
+ QueueMetrics.clearQueueMetrics();
+ DefaultMetricsSystem.shutdown();
}
private void checkMonitorHealth() throws IOException {
@@ -326,6 +331,10 @@ public class TestRMHA {
rm.adminService.transitionToStandby(requestInfo);
rm.adminService.transitionToActive(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
+
+ MyCountingDispatcher dispatcher =
+ (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+ assertTrue(!dispatcher.isStopped());
rm.adminService.transitionToActive(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
@@ -334,6 +343,11 @@ public class TestRMHA {
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
+
+ // Keep the dispatcher reference before transitioning to standby
+ dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+
+
rm.adminService.transitionToStandby(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
@@ -341,6 +355,8 @@ public class TestRMHA {
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
+ assertTrue(dispatcher.isStopped());
+
rm.stop();
}
@@ -375,7 +391,19 @@ public class TestRMHA {
}
@Test
- public void testHAWithRMHostName() {
+ public void testHAWithRMHostName() throws Exception {
+ innerTestHAWithRMHostName(false);
+ configuration.clear();
+ setUp();
+ innerTestHAWithRMHostName(true);
+ }
+
+ public void innerTestHAWithRMHostName(boolean includeBindHost) {
+ //this is run two times, with and without a bind host configured
+ if (includeBindHost) {
+ configuration.set(YarnConfiguration.RM_BIND_HOST, "9.9.9.9");
+ }
+
//test if both RM_HOSTBANE_{rm_id} and RM_RPCADDRESS_{rm_id} are set
//We should only read rpc addresses from RM_RPCADDRESS_{rm_id} configuration
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
@@ -395,6 +423,15 @@ public class TestRMHA {
RM2_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM2_NODE_ID)));
assertEquals("RPC address not set for " + confKey,
RM3_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM3_NODE_ID)));
+ if (includeBindHost) {
+ assertEquals("Web address misconfigured WITH bind-host",
+ rm.webAppAddress.substring(0, 7), "9.9.9.9");
+ } else {
+ //YarnConfiguration tries to figure out which rm host it's on by binding to it,
+ //which doesn't happen for any of these fake addresses, so we end up with 0.0.0.0
+ assertEquals("Web address misconfigured WITHOUT bind-host",
+ rm.webAppAddress.substring(0, 7), "0.0.0.0");
+ }
}
} catch (YarnRuntimeException e) {
fail("Should not throw any exceptions.");
@@ -466,6 +503,8 @@ public class TestRMHA {
private int eventHandlerCount;
+ private volatile boolean stopped = false;
+
public MyCountingDispatcher() {
super("MyCountingDispatcher");
this.eventHandlerCount = 0;
@@ -484,5 +523,15 @@ public class TestRMHA {
public int getEventHandlerCount() {
return this.eventHandlerCount;
}
+
+ @Override
+ protected void serviceStop() throws Exception {
+ this.stopped = true;
+ super.serviceStop();
+ }
+
+ public boolean isStopped() {
+ return this.stopped;
+ }
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Tue Aug 19 23:49:39 2014
@@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
- node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null, null));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
@@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null, null));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
- node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node2.handle(new RMNodeStartedEvent(null, null, null));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null, null));
//Add info to the queue first
node.setNextHeartBeat(false);
@@ -455,12 +456,16 @@ public class TestRMNodeTransitions {
}
private RMNodeImpl getRunningNode() {
+ return getRunningNode(null);
+ }
+
+ private RMNodeImpl getRunningNode(String nmVersion) {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability,
- RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null);
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
}
@@ -491,7 +496,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
@@ -515,7 +520,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
@@ -530,4 +535,15 @@ public class TestRMNodeTransitions {
nodesListManagerEvent.getType());
}
+ @Test
+ public void testReconnnectUpdate() {
+ final String nmVersion1 = "nm version 1";
+ final String nmVersion2 = "nm version 2";
+ RMNodeImpl node = getRunningNode(nmVersion1);
+ Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
+ RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
+ null));
+ Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -67,13 +68,15 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -82,8 +85,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -100,7 +103,6 @@ import org.junit.Before;
import org.junit.Test;
public class TestRMRestart {
-
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
@@ -287,11 +289,11 @@ public class TestRMRestart {
// verify old AM is not accepted
// change running AM to talk to new RM
- am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
- Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
+ Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -303,13 +305,11 @@ public class TestRMRestart {
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
- .getCurrentAppAttempt().getAppAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+ NMContainerStatus status =
+ TestRMRestart
+ .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status), null);
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@@ -392,7 +392,7 @@ public class TestRMRestart {
// completed apps are not removed immediately after app finish
// And finished app is also loaded back.
Assert.assertEquals(4, rmAppState.size());
- }
+ }
@Test (timeout = 60000)
public void testRMRestartAppRunningAMFailed() throws Exception {
@@ -510,14 +510,11 @@ public class TestRMRestart {
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
.getAppAttemptState());
-
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(
- BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+
+ NMContainerStatus status =
+ TestRMRestart.createNMContainerStatus(
+ am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status), null);
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1);
Assert.assertEquals(3, rmApp.getAppAttempts().size());
@@ -615,7 +612,7 @@ public class TestRMRestart {
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
if (count == 0) {
// do nothing; simulate app final state is not saved.
LOG.info(appId + " final state is not saved.");
@@ -763,14 +760,14 @@ public class TestRMRestart {
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
// ignore attempt saving request.
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
// ignore attempt saving request.
}
};
@@ -1211,18 +1208,13 @@ public class TestRMRestart {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
- // the appToken and clientTokenMasterKey that are generated when
+ // the clientTokenMasterKey that are generated when
// RMAppAttempt is created,
- HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
- tokenSet.add(attempt1.getAMRMToken());
byte[] clientTokenMasterKey =
attempt1.getClientTokenMasterKey().getEncoded();
// assert application credentials are saved
Credentials savedCredentials = attemptState.getAppAttemptCredentials();
- HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
- savedTokens.addAll(savedCredentials.getAllTokens());
- Assert.assertEquals(tokenSet, savedTokens);
Assert.assertArrayEquals("client token master key not saved",
clientTokenMasterKey, savedCredentials.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
@@ -1235,11 +1227,8 @@ public class TestRMRestart {
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1);
- // assert loaded attempt recovered attempt tokens
+ // assert loaded attempt recovered
Assert.assertNotNull(loadedAttempt1);
- savedTokens.clear();
- savedTokens.add(loadedAttempt1.getAMRMToken());
- Assert.assertEquals(tokenSet, savedTokens);
// assert client token master key is recovered back to api-versioned
// client token master key
@@ -1638,14 +1627,20 @@ public class TestRMRestart {
// create app that gets launched and does allocate before RM restart
RMApp app1 = rm1.submitApp(200);
- assertQueueMetrics(qm1, 1, 1, 0, 0);
- nm1.nodeHeartbeat(true);
+ // Need to wait first for AppAttempt to be started (RMAppState.ACCEPTED)
+ // and then for it to reach RMAppAttemptState.SCHEDULED
+ // inorder to ensure appsPending metric is incremented
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+ rm1.waitForState(attemptId1, RMAppAttemptState.SCHEDULED);
+ assertQueueMetrics(qm1, 1, 1, 0, 0);
+
+ nm1.nodeHeartbeat(true);
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
- am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
+ am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
@@ -1660,24 +1655,25 @@ public class TestRMRestart {
// PHASE 2: create new RM and start from old state
// create new RM to represent restart and recover state
MockRM rm2 = new MockRM(conf, memStore);
- rm2.start();
- nm1.setResourceTrackerService(rm2.getResourceTrackerService());
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
resetQueueMetrics(qm2);
assertQueueMetrics(qm2, 0, 0, 0, 0);
+
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
- am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
- .getCurrentAppAttempt().getAppAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+
+ NMContainerStatus status =
+ TestRMRestart
+ .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status), null);
+
while (loadedApp1.getAppAttempts().size() != 2) {
Thread.sleep(200);
}
@@ -1801,12 +1797,10 @@ public class TestRMRestart {
// ResourceTrackerService is started.
super.serviceStart();
nm1.setResourceTrackerService(getResourceTrackerService());
- List<ContainerStatus> status = new ArrayList<ContainerStatus>();
- ContainerId amContainer =
- ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
- status.add(ContainerStatus.newInstance(amContainer,
- ContainerState.COMPLETE, "AM container exit", 143));
- nm1.registerNode(status);
+ NMContainerStatus status =
+ TestRMRestart.createNMContainerStatus(
+ am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status), null);
}
};
}
@@ -1845,6 +1839,16 @@ public class TestRMRestart {
}
}
+ public static NMContainerStatus createNMContainerStatus(
+ ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
+ NMContainerStatus containerReport =
+ NMContainerStatus.newInstance(containerId, containerState,
+ Resource.newInstance(1024, 1), "recover container", 0,
+ Priority.newInstance(0), 0);
+ return containerReport;
+ }
+
public class TestMemoryRMStateStore extends MemoryRMStateStore {
int count = 0;
public int updateApp = 0;
@@ -1852,7 +1856,7 @@ public class TestRMRestart {
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
updateApp = ++count;
super.updateApplicationStateInternal(appId, appStateData);
}
@@ -1861,7 +1865,7 @@ public class TestRMRestart {
public synchronized void
updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData)
+ ApplicationAttemptStateData attemptStateData)
throws Exception {
updateAttempt = ++count;
super.updateApplicationAttemptStateInternal(attemptId,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Tue Aug 19 23:49:39 2014
@@ -27,7 +27,10 @@ import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -39,8 +42,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -235,4 +240,75 @@ public class TestResourceManager {
}
}
+ @Test(timeout = 50000)
+ public void testFilterOverrides() throws Exception {
+ String filterInitializerConfKey = "hadoop.http.filter.initializers";
+ String[] filterInitializers =
+ {
+ AuthenticationFilterInitializer.class.getName(),
+ RMAuthenticationFilterInitializer.class.getName(),
+ AuthenticationFilterInitializer.class.getName() + ","
+ + RMAuthenticationFilterInitializer.class.getName(),
+ AuthenticationFilterInitializer.class.getName() + ", "
+ + RMAuthenticationFilterInitializer.class.getName(),
+ AuthenticationFilterInitializer.class.getName() + ", "
+ + this.getClass().getName() };
+ for (String filterInitializer : filterInitializers) {
+ resourceManager = new ResourceManager();
+ Configuration conf = new YarnConfiguration();
+ conf.set(filterInitializerConfKey, filterInitializer);
+ conf.set("hadoop.security.authentication", "kerberos");
+ conf.set("hadoop.http.authentication.type", "kerberos");
+ try {
+ try {
+ UserGroupInformation.setConfiguration(conf);
+ } catch (Exception e) {
+ // ignore we just care about getting true for
+ // isSecurityEnabled()
+ LOG.info("Got expected exception");
+ }
+ resourceManager.init(conf);
+ resourceManager.startWepApp();
+ } catch (RuntimeException e) {
+ // Exceptions are expected because we didn't setup everything
+ // just want to test filter settings
+ String tmp = resourceManager.getConfig().get(filterInitializerConfKey);
+ if (filterInitializer.contains(this.getClass().getName())) {
+ Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName()
+ + "," + this.getClass().getName(), tmp);
+ } else {
+ Assert.assertEquals(
+ RMAuthenticationFilterInitializer.class.getName(), tmp);
+ }
+ resourceManager.stop();
+ }
+ }
+
+ // simple mode overrides
+ String[] simpleFilterInitializers =
+ { "", StaticUserWebFilter.class.getName() };
+ for (String filterInitializer : simpleFilterInitializers) {
+ resourceManager = new ResourceManager();
+ Configuration conf = new YarnConfiguration();
+ conf.set(filterInitializerConfKey, filterInitializer);
+ try {
+ UserGroupInformation.setConfiguration(conf);
+ resourceManager.init(conf);
+ resourceManager.startWepApp();
+ } catch (RuntimeException e) {
+ // Exceptions are expected because we didn't setup everything
+ // just want to test filter settings
+ String tmp = resourceManager.getConfig().get(filterInitializerConfKey);
+ if (filterInitializer.equals(StaticUserWebFilter.class.getName())) {
+ Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName()
+ + "," + StaticUserWebFilter.class.getName(), tmp);
+ } else {
+ Assert.assertEquals(
+ RMAuthenticationFilterInitializer.class.getName(), tmp);
+ }
+ resourceManager.stop();
+ }
+ }
+ }
+
}