You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/11/19 18:26:26 UTC
svn commit: r1543510 [2/2] - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/ya...
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Tue Nov 19 17:26:23 2013
@@ -201,7 +201,8 @@ public class FairSchedulerPage extends R
_("$(function() {",
" $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
" $('#cs').bind('loaded.jstree', function (e, data) {",
- " data.inst.open_node('#pq', true);",
+ " var callback = { call:reopenQueryNodes }",
+ " data.inst.open_node('#pq', callback);",
" }).",
" jstree({",
" core: { animation: 188, html_titles: true },",
@@ -217,7 +218,8 @@ public class FairSchedulerPage extends R
" $('#apps').dataTable().fnFilter(q, 3, true);",
" });",
" $('#cs').show();",
- "});")._();
+ "});")._().
+ _(SchedulerPageUtil.QueueBlockUtil.class);
}
@Override protected Class<? extends SubView> content() {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Tue Nov 19 17:26:23 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -98,21 +100,27 @@ public class MockNM {
}
public RegisterNodeManagerResponse registerNode() throws Exception {
+ return registerNode(null);
+ }
+
+ public RegisterNodeManagerResponse registerNode(
+ List<ContainerStatus> containerStatus) throws Exception{
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(httpPort);
Resource resource = BuilderUtils.newResource(memory, vCores);
req.setResource(resource);
+ req.setContainerStatuses(containerStatus);
req.setNMVersion(version);
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);
this.currentContainerTokenMasterKey =
registrationResponse.getContainerTokenMasterKey();
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
- return registrationResponse;
+ return registrationResponse;
}
-
+
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
isHealthy, ++responseId);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Nov 19 17:26:23 2013
@@ -421,6 +421,10 @@ public class MockRM extends ResourceMana
return this.clientToAMSecretManager;
}
+ public RMAppManager getRMAppManager() {
+ return this.rmAppManager;
+ }
+
@Override
protected void startWepApp() {
// override to disable webapp
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Tue Nov 19 17:26:23 2013
@@ -172,7 +172,7 @@ public class TestAppManager{
ApplicationSubmissionContext submissionContext, String user)
throws YarnException {
super.submitApplication(submissionContext, System.currentTimeMillis(),
- false, user);
+ user, false, null);
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Nov 19 17:26:23 2013
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -29,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -57,10 +63,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -83,6 +93,7 @@ import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mortbay.log.Log;
public class TestRMRestart {
@@ -104,6 +115,7 @@ public class TestRMRestart {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
}
+ @SuppressWarnings("rawtypes")
@Test (timeout=180000)
public void testRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@@ -252,11 +264,14 @@ public class TestRMRestart {
.getApplicationId());
// verify state machine kicked into expected states
- rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
- // verify new attempts created
- Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
+ // verify attempts for apps
+ // The app for which AM was started will wait for previous am
+ // container finish event to arrive. However for an application for which
+ // no am container was running will start new application attempt.
+ Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
// verify old AM is not accepted
@@ -274,8 +289,20 @@ public class TestRMRestart {
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
// new NM to represent NM re-register
- nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
- nm2 = rm2.registerNode("127.0.0.2:5678", 15120);
+ nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+ nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
+
+ List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
+ .getCurrentAppAttempt().getAppAttemptId(), 1),
+ ContainerState.COMPLETE, "Killed AM container", 143);
+ containerStatuses.add(containerStatus);
+ nm1.registerNode(containerStatuses);
+ nm2.registerNode();
+
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
@@ -399,6 +426,157 @@ public class TestRMRestart {
}
@Test
+ public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
+ // testing 3 cases
+ // After RM restarts
+ // 1) New application attempt is not started until previous AM container
+ // finish event is reported back to RM as a part of nm registration.
+ // 2) If previous AM container finish event is never reported back (i.e.
+ // node manager on which this AM container was running also went down) in
+ // that case AMLivenessMonitor should time out previous attempt and start
+ // new attempt.
+ // 3) If all the stored attempts had finished then new attempt should
+ // be started immediately.
+ YarnConfiguration conf = new YarnConfiguration(this.conf);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+
+ // start RM
+ final MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // submitting app
+ RMApp app1 = rm1.submitApp(200);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am1 = launchAM(app1, rm1, nm1);
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ // Fail first AM.
+ am1.waitForState(RMAppAttemptState.FAILED);
+
+ // launch another AM.
+ MockAM am2 = launchAM(app1, rm1, nm1);
+
+ Assert.assertEquals(1, rmAppState.size());
+ Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
+ Assert.assertEquals(app1.getAppAttempts()
+ .get(app1.getCurrentAppAttempt().getAppAttemptId())
+ .getAppAttemptState(), RMAppAttemptState.RUNNING);
+
+ // start new RM.
+ MockRM rm2 = null;
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NodeHeartbeatResponse res = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
+
+ RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ // application should be in running state
+ rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+ // new attempt should not be started
+ Assert.assertEquals(2, rmApp.getAppAttempts().size());
+ // am1 attempt should be in FAILED state where as am2 attempt should be in
+ // LAUNCHED state
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+ .getAppAttemptState());
+ Assert.assertEquals(RMAppAttemptState.LAUNCHED,
+ rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+ .getAppAttemptState());
+
+ List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(
+ BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
+ ContainerState.COMPLETE, "Killed AM container", 143);
+ containerStatuses.add(containerStatus);
+ nm1.registerNode(containerStatuses);
+ rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+ launchAM(rmApp, rm2, nm1);
+ Assert.assertEquals(3, rmApp.getAppAttempts().size());
+ rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.RUNNING);
+ // Now restart RM ...
+ // Setting AMLivelinessMonitor interval to be 10 Secs.
+ conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+ MockRM rm3 = null;
+ rm3 = new MockRM(conf, memStore);
+ rm3.start();
+
+ // Wait for RM to process all the events as a part of rm recovery.
+ nm1.setResourceTrackerService(rm3.getResourceTrackerService());
+
+ rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
+ // application should be in running state
+ rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+ Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
+ // new attempt should not be started
+ Assert.assertEquals(3, rmApp.getAppAttempts().size());
+ // am1 and am2 attempts should be in FAILED state where as am3 should be
+ // in LAUNCHED state
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+ .getAppAttemptState());
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+ .getAppAttemptState());
+ ApplicationAttemptId latestAppAttemptId =
+ rmApp.getCurrentAppAttempt().getAppAttemptId();
+ Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
+ .get(latestAppAttemptId).getAppAttemptState());
+
+ rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
+ rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(4, rmApp.getAppAttempts().size());
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
+
+ latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId();
+
+ // The 4th attempt has started but is not yet saved into RMStateStore
+ // It will be saved only when we launch AM.
+
+ // submitting app but not starting AM for it.
+ RMApp app2 = rm3.submitApp(200);
+ rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(1, app2.getAppAttempts().size());
+ Assert.assertEquals(0,
+ memStore.getState().getApplicationState().get(app2.getApplicationId())
+ .getAttemptCount());
+
+ MockRM rm4 = null;
+ rm4 = new MockRM(conf, memStore);
+ rm4.start();
+
+ rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
+ rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(4, rmApp.getAppAttempts().size());
+ Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
+ Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts()
+ .get(latestAppAttemptId).getAppAttemptState());
+
+ // The initial application for which an AM was not started should be in
+ // ACCEPTED state with one application attempt started.
+ app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId());
+ rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(RMAppState.ACCEPTED, app2.getState());
+ Assert.assertEquals(1, app2.getAppAttempts().size());
+ Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
+ .getCurrentAppAttempt().getAppAttemptState());
+
+ }
+
+ @Test
public void testRMRestartFailedApp() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
@@ -577,7 +755,14 @@ public class TestRMRestart {
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
// restart rm
- MockRM rm2 = new MockRM(conf, memStore);
+
+ MockRM rm2 = new MockRM(conf, memStore) {
+ @Override
+ protected RMAppManager createRMAppManager() {
+ return spy(super.createRMAppManager());
+ }
+ };
+
rm2.start();
GetApplicationsRequest request1 =
@@ -620,6 +805,10 @@ public class TestRMRestart {
rm2.getClientRMService().getApplications(request2);
List<ApplicationReport> appList2 = response2.getApplicationList();
Assert.assertTrue(3 == appList2.size());
+
+ // check application summary is logged for the completed apps after RM restart.
+ verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
+ isA(ApplicationId.class));
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@@ -720,6 +909,8 @@ public class TestRMRestart {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
+ // Setting AMLivelinessMonitor interval to be 10 Secs.
+ conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
@@ -818,6 +1009,10 @@ public class TestRMRestart {
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
+ // Need to wait for a while as now token renewal happens on another thread
+ // and is asynchronous in nature.
+ waitForTokensToBeRenewed(rm2);
+
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
@@ -827,6 +1022,21 @@ public class TestRMRestart {
rm2.stop();
}
+ private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
+ int waitCnt = 20;
+ boolean atleastOneAppInNEWState = true;
+ while (waitCnt-- > 0 && atleastOneAppInNEWState) {
+ atleastOneAppInNEWState = false;
+ for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
+ if (rmApp.getState() == RMAppState.NEW) {
+ Thread.sleep(1000);
+ atleastOneAppInNEWState = true;
+ break;
+ }
+ }
+ }
+ }
+
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -920,7 +1130,6 @@ public class TestRMRestart {
@Test
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-
conf.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
@@ -1063,6 +1272,43 @@ public class TestRMRestart {
rm2.stop();
}
+ // This is to test submit an application to the new RM with the old delegation
+ // token got from previous RM.
+ @Test
+ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
+ throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
+ UserGroupInformation.setConfiguration(conf);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new TestSecurityMockRM(conf, memStore);
+ rm1.start();
+
+ GetDelegationTokenRequest request1 =
+ GetDelegationTokenRequest.newInstance("renewer1");
+ UserGroupInformation.getCurrentUser().setAuthenticationMethod(
+ AuthMethod.KERBEROS);
+ GetDelegationTokenResponse response1 =
+ rm1.getClientRMService().getDelegationToken(request1);
+ Token<RMDelegationTokenIdentifier> token1 =
+ ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr);
+
+ // start new RM
+ MockRM rm2 = new TestSecurityMockRM(conf, memStore);
+ rm2.start();
+
+ // submit an app with the old delegation token got from previous RM.
+ Credentials ts = new Credentials();
+ ts.addToken(token1.getService(), token1);
+ RMApp app = rm2.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
+ rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+ }
+
@Test
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Tue Nov 19 17:26:23 2013
@@ -372,10 +372,10 @@ public class TestRMAppAttemptTransitions
}
/**
- * {@link RMAppAttemptState#RECOVERED}
+ * {@link RMAppAttemptState#LAUNCHED}
*/
private void testAppAttemptRecoveredState() {
- assertEquals(RMAppAttemptState.RECOVERED,
+ assertEquals(RMAppAttemptState.LAUNCHED,
applicationAttempt.getAppAttemptState());
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Tue Nov 19 17:26:23 2013
@@ -141,6 +141,8 @@ public class TestFifoScheduler {
FifoScheduler schedular = new FifoScheduler();
schedular.reinitialize(new Configuration(), rmContext);
+ QueueMetrics metrics = schedular.getRootQueueMetrics();
+ int beforeAppsSubmitted = metrics.getAppsSubmitted();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
@@ -155,8 +157,8 @@ public class TestFifoScheduler {
event = new AppAddedSchedulerEvent(appAttemptId, "queue", "user");
schedular.handle(event);
- QueueMetrics metrics = schedular.getRootQueueMetrics();
- Assert.assertEquals(1, metrics.getAppsSubmitted());
+ int afterAppsSubmitted = metrics.getAppsSubmitted();
+ Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
}
@Test(timeout=2000)
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Tue Nov 19 17:26:23 2013
@@ -31,13 +31,24 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -46,16 +57,29 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@@ -66,14 +90,18 @@ import org.mockito.stubbing.Answer;
/**
* unit test -
- * tests addition/deletion/cancelation of renewals of delegation tokens
+ * tests addition/deletion/cancellation of renewals of delegation tokens
*
*/
+@SuppressWarnings("rawtypes")
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+ private static BlockingQueue<Event> eventQueue;
+ private static volatile AtomicInteger counter;
+ private static AsyncDispatcher dispatcher;
public static class Renewer extends TokenRenewer {
private static int counter = 0;
private static Token<?> lastRenewed = null;
@@ -143,11 +171,20 @@ public class TestDelegationTokenRenewer
@Before
public void setUp() throws Exception {
+ counter = new AtomicInteger(0);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ eventQueue = new LinkedBlockingQueue<Event>();
+ dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
- delegationTokenRenewer = new DelegationTokenRenewer();
+ delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
delegationTokenRenewer.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ delegationTokenRenewer);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
@@ -285,7 +322,7 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTRenewal () throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -316,8 +353,9 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplication(applicationId_0, ts, true);
-
+ delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
+
// first 3 initial renewals + 1 real
int numberOfExpectedRenewals = 3+1;
@@ -355,9 +393,10 @@ public class TestDelegationTokenRenewer
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, true);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -377,8 +416,8 @@ public class TestDelegationTokenRenewer
}
}
- @Test
- public void testInvalidDTWithAddApplication() throws Exception {
+ @Test(timeout=60000)
+ public void testAppRejectionWithCancelledDelegationToken() throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -390,12 +429,21 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
- try {
- delegationTokenRenewer.addApplication(appId, ts, true);
- fail("App submission with a cancelled token should have failed");
- } catch (InvalidToken e) {
- // expected
+ delegationTokenRenewer.addApplication(appId, ts, true, false);
+ int waitCnt = 20;
+ while (waitCnt-- >0) {
+ if (!eventQueue.isEmpty()) {
+ Event evt = eventQueue.take();
+ if (evt.getType() == RMAppEventType.APP_REJECTED) {
+ Assert.assertTrue(
+ ((RMAppEvent) evt).getApplicationId().equals(appId));
+ return;
+ }
+ } else {
+ Thread.sleep(500);
+ }
}
+ fail("App submission with a cancelled token should have failed");
}
/**
@@ -408,7 +456,7 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTRenewalWithNoCancel () throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -425,9 +473,10 @@ public class TestDelegationTokenRenewer
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, false);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -454,9 +503,8 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTKeepAlive1 () throws Exception {
- DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
@@ -465,10 +513,15 @@ public class TestDelegationTokenRenewer
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
+ DelegationTokenRenewer localDtr =
+ createNewDelegationTokenRenewer(lconf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -487,16 +540,25 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
+ waitForEventsToGetProcessed(localDtr);
+ if (!eventQueue.isEmpty()){
+ Event evt = eventQueue.take();
+ if (evt instanceof RMAppEvent) {
+ Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START);
+ } else {
+ fail("RMAppEvent.START was expected!!");
+ }
+ }
+
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(3000l);
+ waitForEventsToGetProcessed(localDtr);
//Token should still be around. Renewal should not fail.
token1.renew(lconf);
//Allow the keepalive time to run out
- Thread.sleep(6000l);
+ Thread.sleep(10000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
@@ -518,9 +580,8 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTKeepAlive2() throws Exception {
- DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
@@ -529,10 +590,15 @@ public class TestDelegationTokenRenewer
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
+ DelegationTokenRenewer localDtr =
+ createNewDelegationTokenRenewer(conf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -551,22 +617,18 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(4000l);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive.
localDtr.updateKeepAliveApplications(Collections
.singletonList(applicationId_0));
//Renewal should not fail.
token1.renew(lconf);
-
//Token should be around after this.
Thread.sleep(4500l);
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
token1.renew(lconf);
-
//Allow the keepalive time to run out
Thread.sleep(3000l);
//The token should have been cancelled at this point. Renewal will fail.
@@ -575,61 +637,127 @@ public class TestDelegationTokenRenewer
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
-
- @Test(timeout=20000)
- public void testConncurrentAddApplication()
- throws IOException, InterruptedException, BrokenBarrierException {
- final CyclicBarrier startBarrier = new CyclicBarrier(2);
- final CyclicBarrier endBarrier = new CyclicBarrier(2);
-
- // this token uses barriers to block during renew
- final Credentials creds1 = new Credentials();
- final Token<?> token1 = mock(Token.class);
- creds1.addToken(new Text("token"), token1);
- doReturn(true).when(token1).isManaged();
- doAnswer(new Answer<Long>() {
- public Long answer(InvocationOnMock invocation)
- throws InterruptedException, BrokenBarrierException {
- startBarrier.await();
- endBarrier.await();
- return Long.MAX_VALUE;
- }}).when(token1).renew(any(Configuration.class));
-
- // this dummy token fakes renewing
- final Credentials creds2 = new Credentials();
- final Token<?> token2 = mock(Token.class);
- creds2.addToken(new Text("token"), token2);
- doReturn(true).when(token2).isManaged();
- doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
-
- // fire up the renewer
- final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
- dtr.init(conf);
- RMContext mockContext = mock(RMContext.class);
- ClientRMService mockClientRMService = mock(ClientRMService.class);
- when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
- InetSocketAddress sockAddr =
- InetSocketAddress.createUnresolved("localhost", 1234);
- when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
- dtr.setRMContext(mockContext);
- dtr.start();
-
- // submit a job that blocks during renewal
- Thread submitThread = new Thread() {
+
+ private DelegationTokenRenewer createNewDelegationTokenRenewer(
+ Configuration conf, final AtomicInteger counter) {
+ return new DelegationTokenRenewer() {
+
@Override
- public void run() {
- try {
- dtr.addApplication(mock(ApplicationId.class), creds1, false);
- } catch (IOException e) {}
+ protected ThreadPoolExecutor
+ createNewThreadPoolService(Configuration conf) {
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>()) {
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ counter.decrementAndGet();
+ super.afterExecute(r, t);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ counter.incrementAndGet();
+ super.execute(command);
+ }
+ };
+ return pool;
}
};
- submitThread.start();
-
+ }
+
+ private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
+ throws InterruptedException {
+ int wait = 40;
+ while (wait-- > 0
+ && counter.get() > 0) {
+ Thread.sleep(200);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testConcurrentAddApplication()
+ throws IOException, InterruptedException, BrokenBarrierException {
+ final CyclicBarrier startBarrier = new CyclicBarrier(2);
+ final CyclicBarrier endBarrier = new CyclicBarrier(2);
+
+ // this token uses barriers to block during renew
+ final Credentials creds1 = new Credentials();
+ final Token<?> token1 = mock(Token.class);
+ creds1.addToken(new Text("token"), token1);
+ doReturn(true).when(token1).isManaged();
+ doAnswer(new Answer<Long>() {
+ public Long answer(InvocationOnMock invocation)
+ throws InterruptedException, BrokenBarrierException {
+ startBarrier.await();
+ endBarrier.await();
+ return Long.MAX_VALUE;
+ }}).when(token1).renew(any(Configuration.class));
+
+ // this dummy token fakes renewing
+ final Credentials creds2 = new Credentials();
+ final Token<?> token2 = mock(Token.class);
+ creds2.addToken(new Text("token"), token2);
+ doReturn(true).when(token2).isManaged();
+ doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
+
+ // fire up the renewer
+ final DelegationTokenRenewer dtr =
+ createNewDelegationTokenRenewer(conf, counter);
+ dtr.init(conf);
+ RMContext mockContext = mock(RMContext.class);
+ ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ InetSocketAddress sockAddr =
+ InetSocketAddress.createUnresolved("localhost", 1234);
+ when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+ dtr.setRMContext(mockContext);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+ dtr.start();
+ // submit a job that blocks during renewal
+ Thread submitThread = new Thread() {
+ @Override
+ public void run() {
+ dtr.addApplication(mock(ApplicationId.class), creds1, false, false);
+ }
+ };
+ submitThread.start();
+
// wait till 1st submit blocks, then submit another
- startBarrier.await();
- dtr.addApplication(mock(ApplicationId.class), creds2, false);
- // signal 1st to complete
- endBarrier.await();
- submitThread.join();
+ startBarrier.await();
+ dtr.addApplication(mock(ApplicationId.class), creds2, false, false);
+ // signal 1st to complete
+ endBarrier.await();
+ submitThread.join();
+ }
+
+ @Test(timeout=20000)
+ public void testAppSubmissionWithInvalidDelegationToken() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ MockRM rm = new MockRM(conf);
+ ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes());
+ ContainerLaunchContext amContainer =
+ ContainerLaunchContext.newInstance(
+ new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+ new ArrayList<String>(), new HashMap<String, ByteBuffer>(), tokens,
+ new HashMap<ApplicationAccessType, String>());
+ ApplicationSubmissionContext appSubContext =
+ ApplicationSubmissionContext.newInstance(
+ ApplicationId.newInstance(1234121, 0),
+ "BOGUS", "default", Priority.UNDEFINED, amContainer, false,
+ true, 1, Resource.newInstance(1024, 1), "BOGUS");
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(appSubContext);
+ try {
+ rm.getClientRMService().submitApplication(request);
+ fail("Error was excepted.");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Bad header found in token storage"));
+ }
}
}