You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2016/06/03 20:07:28 UTC
hadoop git commit: YARN-5098. Fixed ResourceManager's
DelegationTokenRenewer to replace expiring system-tokens if RM stops and only
restarts after a long time. Contributed by Jian He.
Repository: hadoop
Updated Branches:
refs/heads/trunk 99cc439e2 -> f10ebc67f
YARN-5098. Fixed ResourceManager's DelegationTokenRenewer to replace expiring system-tokens if RM stops and only restarts after a long time. Contributed by Jian He.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f10ebc67
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f10ebc67
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f10ebc67
Branch: refs/heads/trunk
Commit: f10ebc67f57a4a2e3cc916c41154ab9b6a4635c9
Parents: 99cc439
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Fri Jun 3 13:00:07 2016 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Fri Jun 3 13:00:07 2016 -0700
----------------------------------------------------------------------
.../security/DelegationTokenRenewer.java | 27 ++++--
.../security/TestDelegationTokenRenewer.java | 98 ++++++++++++++++++++
2 files changed, 118 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f10ebc67/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index fd12f11..4177ee2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.service.AbstractService;
@@ -459,6 +460,18 @@ public class DelegationTokenRenewer extends AbstractService {
try {
renewToken(dttr);
} catch (IOException ioe) {
+ if (ioe instanceof SecretManager.InvalidToken
+ && dttr.maxDate < Time.now()
+ && evt instanceof DelegationTokenRenewerAppRecoverEvent
+ && token.getKind().equals(HDFS_DELEGATION_KIND)) {
+ LOG.info("Failed to renew hdfs token " + dttr
+ + " on recovery as it expired, requesting new hdfs token for "
+ + applicationId + ", user=" + evt.getUser(), ioe);
+ requestNewHdfsDelegationTokenAsProxyUser(
+ Arrays.asList(applicationId), evt.getUser(),
+ evt.shouldCancelAtEnd());
+ continue;
+ }
throw new IOException("Failed to renew token: " + dttr.token, ioe);
}
}
@@ -485,7 +498,8 @@ public class DelegationTokenRenewer extends AbstractService {
}
if (!hasHdfsToken) {
- requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
+ requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId),
+ evt.getUser(),
shouldCancelAtEnd);
}
}
@@ -586,8 +600,7 @@ public class DelegationTokenRenewer extends AbstractService {
} catch (InterruptedException e) {
throw new IOException(e);
}
- LOG.info("Renewed delegation-token= [" + dttr + "], for "
- + dttr.referringAppIds);
+ LOG.info("Renewed delegation-token= [" + dttr + "]");
}
// Request new hdfs token if the token is about to expire, and remove the old
@@ -625,12 +638,12 @@ public class DelegationTokenRenewer extends AbstractService {
}
}
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
- requestNewHdfsDelegationToken(applicationIds, dttr.user,
+ requestNewHdfsDelegationTokenAsProxyUser(applicationIds, dttr.user,
dttr.shouldCancelAtEnd);
}
}
- private void requestNewHdfsDelegationToken(
+ private void requestNewHdfsDelegationTokenAsProxyUser(
Collection<ApplicationId> referringAppIds,
String user, boolean shouldCancelAtEnd) throws IOException,
InterruptedException {
@@ -912,8 +925,8 @@ public class DelegationTokenRenewer extends AbstractService {
// Setup tokens for renewal during recovery
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
} catch (Throwable t) {
- LOG.warn(
- "Unable to add the application to the delegation token renewer.", t);
+ LOG.warn("Unable to add the application to the delegation token"
+ + " renewer on recovery.", t);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f10ebc67/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 1bfac8d..74fe534 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -43,6 +43,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -968,6 +970,101 @@ public class TestDelegationTokenRenewer {
Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
}
+
+ // 1. token is expired before app completes.
+ // 2. RM shutdown.
+ // 3. When RM recovers the app, token renewal will fail as token expired.
+ // RM should request a new token and sent it to NM for log-aggregation.
+ @Test
+ public void testRMRestartWithExpiredToken() throws Exception {
+ Configuration yarnConf = new YarnConfiguration();
+ yarnConf
+ .setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
+ yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ yarnConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ yarnConf
+ .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ UserGroupInformation.setConfiguration(yarnConf);
+
+ // create Token1:
+ Text userText1 = new Text("user1");
+ DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1,
+ new Text("renewer1"), userText1);
+ final Token<DelegationTokenIdentifier> originalToken =
+ new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(),
+ new Text("service1"));
+ Credentials credentials = new Credentials();
+ credentials.addToken(userText1, originalToken);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(yarnConf);
+ MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore);
+ rm1.start();
+ RMApp app = rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", 1,
+ credentials);
+
+ // create token2
+ Text userText2 = new Text("user1");
+ DelegationTokenIdentifier dtId2 =
+ new DelegationTokenIdentifier(userText1, new Text("renewer2"),
+ userText2);
+ final Token<DelegationTokenIdentifier> updatedToken =
+ new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
+ "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+ AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
+ AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
+ MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) {
+ @Override
+ protected DelegationTokenRenewer createDelegationTokenRenewer() {
+ return new DelegationTokenRenewer() {
+
+ @Override
+ protected void renewToken(final DelegationTokenToRenew dttr)
+ throws IOException {
+
+ if (dttr.token.equals(updatedToken)) {
+ secondRenewInvoked.set(true);
+ super.renewToken(dttr);
+ } else if (dttr.token.equals(originalToken)){
+ firstRenewInvoked.set(true);
+ throw new InvalidToken("Failed to renew");
+ } else {
+ throw new IOException("Unexpected");
+ }
+ }
+
+ @Override
+ protected Token<?>[] obtainSystemTokensForUser(String user,
+ final Credentials credentials) throws IOException {
+ credentials.addToken(updatedToken.getService(), updatedToken);
+ return new Token<?>[] { updatedToken };
+ }
+ };
+ }
+ };
+
+ // simulating restart the rm
+ rm2.start();
+
+ // check nm can retrieve the token
+ final MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+ nm1.registerNode();
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+ ByteBuffer tokenBuffer =
+ response.getSystemCredentialsForApps().get(app.getApplicationId());
+ Assert.assertNotNull(tokenBuffer);
+ Credentials appCredentials = new Credentials();
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ tokenBuffer.rewind();
+ buf.reset(tokenBuffer);
+ appCredentials.readTokenStorageStream(buf);
+ Assert.assertTrue(firstRenewInvoked.get() && secondRenewInvoked.get());
+ Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken));
+ }
+
// YARN will get the token for the app submitted without the delegation token.
@Test
public void testAppSubmissionWithoutDelegationToken() throws Exception {
@@ -1158,4 +1255,5 @@ public class TestDelegationTokenRenewer {
Assert.assertTrue(dttr.isTimerCancelled());
Assert.assertTrue(Renewer.cancelled);
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org