You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/02 00:33:15 UTC
git commit: YARN-2790. Fixed a NodeManager bug that was causing
log-aggregation to fail beyond HFDS delegation-token expiry even when RM is a
proxy-user (YARN-2704). Contributed by Jian He.
Repository: hadoop
Updated Branches:
refs/heads/trunk 36ccf097a -> 5c0381c96
YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704). Contributed by Jian He.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c0381c9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c0381c9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c0381c9
Branch: refs/heads/trunk
Commit: 5c0381c96aa79196829edbca497c649eb6776944
Parents: 36ccf09
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Sat Nov 1 16:32:35 2014 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Sat Nov 1 16:32:35 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 4 ++
.../yarn/server/nodemanager/NodeManager.java | 2 +-
.../nodemanager/NodeStatusUpdaterImpl.java | 2 +-
.../localizer/ResourceLocalizationService.java | 4 +-
.../logaggregation/AppLogAggregatorImpl.java | 26 ++++++-
.../logaggregation/LogAggregationService.java | 13 ----
.../TestLogAggregationService.java | 72 +++++++++++++++++++-
7 files changed, 100 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f9931cf..25b1635 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -832,6 +832,10 @@ Release 2.6.0 - UNRELEASED
YARN-2711. Fixed TestDefaultContainerExecutor#testContainerLaunchError failure on
Windows. (Varun Vasudev via zjshen)
+ YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail
+ beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704).
+ (Jian He via vinodkv)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 22057f4..4f90bf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -433,7 +433,7 @@ public class NodeManager extends CompositeService
return systemCredentials;
}
- public void setSystemCrendentials(
+ public void setSystemCrendentialsForApps(
Map<ApplicationId, Credentials> systemCredentials) {
this.systemCredentials = systemCredentials;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 1c3ac5c..ebbe503 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -626,7 +626,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
response.getSystemCredentialsForApps();
if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context)
- .setSystemCrendentials(parseCredentials(systemCredentials));
+ .setSystemCrendentialsForApps(parseCredentials(systemCredentials));
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 08ed3a1..cb56d67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -1122,9 +1122,9 @@ public class ResourceLocalizationService extends CompositeService
if (systemCredentials == null) {
return null;
}
- LOG.info("Adding new framework tokens from RM for " + appId);
for (Token<?> token : systemCredentials.getAllTokens()) {
- LOG.info("Adding new application-token for localization: " + token);
+ LOG.info("Adding new framework-token for " + appId
+ + " for localization: " + token);
}
return systemCredentials;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 43cd7b5..3174c5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -43,19 +42,21 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -197,6 +198,19 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return;
}
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials systemCredentials =
+ context.getSystemCredentialsForApps().get(appId);
+ if (systemCredentials != null) {
+ for (Token<?> token : systemCredentials.getAllTokens()) {
+ LOG.info("Adding new framework-token for " + appId
+ + " for log-aggregation: " + token + " user=" + userUgi);
+ }
+ // this will replace old token
+ userUgi.addCredentials(systemCredentials);
+ }
+ }
+
// Create a set of Containers whose logs will be uploaded in this cycle.
// It includes:
// a) all containers in pendingContainers: those containers are finished
@@ -538,4 +552,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return logValue.getCurrentUpLoadedFilesPath();
}
}
+
+ // only for test
+ @VisibleForTesting
+ public UserGroupInformation getUgi() {
+ return this.userUgi;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index cc717d7..bd3e847 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -344,18 +343,6 @@ public class LogAggregationService extends AbstractService implements
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
- if (UserGroupInformation.isSecurityEnabled()) {
- Credentials systemCredentials =
- context.getSystemCredentialsForApps().get(appId);
- if (systemCredentials != null) {
- LOG.info("Adding new framework tokens from RM for " + appId);
- for (Token<?> token : systemCredentials.getAllTokens()) {
- LOG.info("Adding new application-token for log-aggregation: " + token);
- }
- credentials = systemCredentials;
- }
- }
-
// Get user's FileSystem credentials
final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index cea71fa..419de88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -55,11 +55,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.junit.Assert;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -67,8 +67,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -93,10 +96,11 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -107,19 +111,22 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mortbay.util.MultiException;
+import com.google.common.base.Supplier;
+
//@Ignore
public class TestLogAggregationService extends BaseContainerManagerTest {
@@ -152,6 +159,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
dispatcher = createDispatcher();
appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
+ UserGroupInformation.setConfiguration(conf);
}
@Override
@@ -1424,6 +1432,64 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
dispatcher.stop();
}
+
+ @Test (timeout = 20000)
+ public void testAddNewTokenSentFromRMForLogAggregation() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ DrainDispatcher dispatcher = createDispatcher();
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+ ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+ Application mockApp = mock(Application.class);
+ when(mockApp.getContainers()).thenReturn(
+ new HashMap<ContainerId, Container>());
+ this.context.getApplications().put(application1, mockApp);
+ @SuppressWarnings("resource")
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, this.delSrvc,
+ super.dirsHandler);
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+ logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
+ this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
+ Records.newRecord(LogAggregationContext.class)));
+
+ // Inject new token for log-aggregation after app log-aggregator init
+ Text userText1 = new Text("user1");
+ RMDelegationTokenIdentifier dtId1 =
+ new RMDelegationTokenIdentifier(userText1, new Text("renewer1"),
+ userText1);
+ final Token<RMDelegationTokenIdentifier> token1 =
+ new Token<RMDelegationTokenIdentifier>(dtId1.getBytes(),
+ "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+ Credentials credentials = new Credentials();
+ credentials.addToken(userText1, token1);
+ this.context.getSystemCredentialsForApps().put(application1, credentials);
+
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
+
+ final UserGroupInformation ugi =
+ ((AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
+ .get(application1)).getUgi();
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ public Boolean get() {
+ boolean hasNewToken = false;
+ for (Token<?> token : ugi.getCredentials().getAllTokens()) {
+ if (token.equals(token1)) {
+ hasNewToken = true;
+ }
+ }
+ return hasNewToken;
+ }
+ }, 1000, 20000);
+ logAggregationService.stop();
+ dispatcher.stop();
+ }
+
private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId, boolean sizeLimited, String lastLogFile)
throws IOException {