You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/08/21 07:22:21 UTC
svn commit: r1619293 [2/2] - in
/hadoop/common/branches/HDFS-6584/hadoop-yarn-project: ./ hadoop-yarn/bin/
hadoop-yarn/conf/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/...
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Thu Aug 21 05:22:10 2014
@@ -171,7 +171,6 @@ public class TestApplicationMasterServic
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
- am1.setAMRMProtocol(rm.getApplicationMasterService());
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Thu Aug 21 05:22:10 2014
@@ -331,6 +331,10 @@ public class TestRMHA {
rm.adminService.transitionToStandby(requestInfo);
rm.adminService.transitionToActive(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
+
+ MyCountingDispatcher dispatcher =
+ (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+ assertTrue(!dispatcher.isStopped());
rm.adminService.transitionToActive(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
@@ -339,6 +343,11 @@ public class TestRMHA {
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
+
+ // Keep the dispatcher reference before transitioning to standby
+ dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+
+
rm.adminService.transitionToStandby(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
@@ -346,6 +355,8 @@ public class TestRMHA {
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
+ assertTrue(dispatcher.isStopped());
+
rm.stop();
}
@@ -492,6 +503,8 @@ public class TestRMHA {
private int eventHandlerCount;
+ private volatile boolean stopped = false;
+
public MyCountingDispatcher() {
super("MyCountingDispatcher");
this.eventHandlerCount = 0;
@@ -510,5 +523,15 @@ public class TestRMHA {
public int getEventHandlerCount() {
return this.eventHandlerCount;
}
+
+ @Override
+ protected void serviceStop() throws Exception {
+ this.stopped = true;
+ super.serviceStop();
+ }
+
+ public boolean isStopped() {
+ return this.stopped;
+ }
}
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Aug 21 05:22:10 2014
@@ -289,7 +289,7 @@ public class TestRMRestart {
// verify old AM is not accepted
// change running AM to talk to new RM
- am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
@@ -1663,7 +1663,7 @@ public class TestRMRestart {
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
- am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Thu Aug 21 05:22:10 2014
@@ -33,10 +33,13 @@ import java.util.Set;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -72,6 +75,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import com.google.common.base.Supplier;
+
+
@SuppressWarnings({"rawtypes", "unchecked"})
@RunWith(value = Parameterized.class)
public class TestWorkPreservingRMRestart {
@@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
- am0.setAMRMProtocol(rm2.getApplicationMasterService());
- am0.registerAppAttempt(false);
+ am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ am0.registerAppAttempt(true);
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
@@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
}
+ // Test if RM on recovery receives the container release request from AM
+ // before it receives the container status reported by NM for recovery. this
+ // container should not be recovered.
+ @Test (timeout = 30000)
+ public void testReleasedContainerNotRecovered() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ rm1.start();
+
+ RMApp app1 = rm1.submitApp(1024);
+ final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Re-start RM
+ conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000);
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ am1.registerAppAttempt(true);
+
+ // try to release a container before the container is actually recovered.
+ final ContainerId runningContainer =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ am1.allocate(null, Arrays.asList(runningContainer));
+
+ // send container statuses to recover the containers
+ List<NMContainerStatus> containerStatuses =
+ createNMContainerStatusForApp(am1);
+ nm1.registerNode(containerStatuses, null);
+
+ // only the am container should be recovered.
+ waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId());
+
+ final AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm2.getResourceScheduler();
+ // cached release request is cleaned.
+ // assertFalse(scheduler.getPendingRelease().contains(runningContainer));
+
+ AllocateResponse response = am1.allocate(null, null);
+ // AM gets notified of the completed container.
+ boolean receivedCompletedContainer = false;
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ if (status.getContainerId().equals(runningContainer)) {
+ receivedCompletedContainer = true;
+ }
+ }
+ assertTrue(receivedCompletedContainer);
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ public Boolean get() {
+ // release cache is cleaned up and previous running container is not
+ // recovered
+ return scheduler.getApplicationAttempt(am1.getApplicationAttemptId())
+ .getPendingRelease().isEmpty()
+ && scheduler.getRMContainer(runningContainer) == null;
+ }
+ }, 1000, 20000);
+ }
+
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
int allocatedContainers, int availableMB, int availableVirtualCores,
@@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart
assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
}
- private void waitForNumContainersToRecover(int num, MockRM rm,
+ public static void waitForNumContainersToRecover(int num, MockRM rm,
ApplicationAttemptId attemptId) throws Exception {
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
@@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart
attempt = scheduler.getApplicationAttempt(attemptId);
}
while (attempt.getLiveContainers().size() < num) {
- System.out.println("Wait for " + num + " containers to recover.");
+ System.out.println("Wait for " + num
+ + " containers to recover. currently: "
+ + attempt.getLiveContainers().size());
Thread.sleep(200);
}
}