You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/02 00:33:15 UTC

git commit: YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704). Contributed by Jian He.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 36ccf097a -> 5c0381c96


YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704). Contributed by Jian He.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c0381c9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c0381c9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c0381c9

Branch: refs/heads/trunk
Commit: 5c0381c96aa79196829edbca497c649eb6776944
Parents: 36ccf09
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Sat Nov 1 16:32:35 2014 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Sat Nov 1 16:32:35 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  4 ++
 .../yarn/server/nodemanager/NodeManager.java    |  2 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |  2 +-
 .../localizer/ResourceLocalizationService.java  |  4 +-
 .../logaggregation/AppLogAggregatorImpl.java    | 26 ++++++-
 .../logaggregation/LogAggregationService.java   | 13 ----
 .../TestLogAggregationService.java              | 72 +++++++++++++++++++-
 7 files changed, 100 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f9931cf..25b1635 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -832,6 +832,10 @@ Release 2.6.0 - UNRELEASED
     YARN-2711. Fixed TestDefaultContainerExecutor#testContainerLaunchError failure on
     Windows. (Varun Vasudev via zjshen)
 
+    YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail
+    beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704).
+    (Jian He via vinodkv)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 22057f4..4f90bf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -433,7 +433,7 @@ public class NodeManager extends CompositeService
       return systemCredentials;
     }
 
-    public void setSystemCrendentials(
+    public void setSystemCrendentialsForApps(
         Map<ApplicationId, Credentials> systemCredentials) {
       this.systemCredentials = systemCredentials;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 1c3ac5c..ebbe503 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -626,7 +626,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                 response.getSystemCredentialsForApps();
             if (systemCredentials != null && !systemCredentials.isEmpty()) {
               ((NMContext) context)
-                .setSystemCrendentials(parseCredentials(systemCredentials));
+                .setSystemCrendentialsForApps(parseCredentials(systemCredentials));
             }
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 08ed3a1..cb56d67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -1122,9 +1122,9 @@ public class ResourceLocalizationService extends CompositeService
       if (systemCredentials == null) {
         return null;
       }
-      LOG.info("Adding new framework tokens from RM for " + appId);
       for (Token<?> token : systemCredentials.getAllTokens()) {
-        LOG.info("Adding new application-token for localization: " + token);
+        LOG.info("Adding new framework-token for " + appId
+            + " for localization: " + token);
       }
       return systemCredentials;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 43cd7b5..3174c5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -43,19 +42,21 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -197,6 +198,19 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       return;
     }
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials systemCredentials =
+          context.getSystemCredentialsForApps().get(appId);
+      if (systemCredentials != null) {
+        for (Token<?> token : systemCredentials.getAllTokens()) {
+          LOG.info("Adding new framework-token for " + appId
+              + " for log-aggregation: " + token + " user=" + userUgi);
+        }
+        // this will replace old token
+        userUgi.addCredentials(systemCredentials);
+      }
+    }
+
     // Create a set of Containers whose logs will be uploaded in this cycle.
     // It includes:
     // a) all containers in pendingContainers: those containers are finished
@@ -538,4 +552,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       return logValue.getCurrentUpLoadedFilesPath();
     }
   }
+
+  // only for test
+  @VisibleForTesting
+  public UserGroupInformation getUgi() {
+    return this.userUgi;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index cc717d7..bd3e847 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -344,18 +343,6 @@ public class LogAggregationService extends AbstractService implements
       Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
 
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Credentials systemCredentials =
-          context.getSystemCredentialsForApps().get(appId);
-      if (systemCredentials != null) {
-        LOG.info("Adding new framework tokens from RM for " + appId);
-        for (Token<?> token : systemCredentials.getAllTokens()) {
-          LOG.info("Adding new application-token for log-aggregation: " + token);
-        }
-        credentials = systemCredentials;
-      }
-    }
-
     // Get user's FileSystem credentials
     final UserGroupInformation userUgi =
         UserGroupInformation.createRemoteUser(user);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0381c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index cea71fa..419de88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -55,11 +55,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.junit.Assert;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -67,8 +67,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -93,10 +96,11 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -107,19 +111,22 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mortbay.util.MultiException;
 
+import com.google.common.base.Supplier;
+
 //@Ignore
 public class TestLogAggregationService extends BaseContainerManagerTest {
 
@@ -152,6 +159,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     dispatcher = createDispatcher();
     appEventHandler = mock(EventHandler.class);
     dispatcher.register(ApplicationEventType.class, appEventHandler);
+    UserGroupInformation.setConfiguration(conf);
   }
 
   @Override
@@ -1424,6 +1432,64 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     dispatcher.stop();
   }
 
+
+  @Test (timeout = 20000)
+  public void testAddNewTokenSentFromRMForLogAggregation() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+      "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    DrainDispatcher dispatcher = createDispatcher();
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    Application mockApp = mock(Application.class);
+    when(mockApp.getContainers()).thenReturn(
+      new HashMap<ContainerId, Container>());
+    this.context.getApplications().put(application1, mockApp);
+    @SuppressWarnings("resource")
+    LogAggregationService logAggregationService =
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+          super.dirsHandler);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+    logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
+      this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
+      Records.newRecord(LogAggregationContext.class)));
+
+    // Inject new token for log-aggregation after app log-aggregator init
+    Text userText1 = new Text("user1");
+    RMDelegationTokenIdentifier dtId1 =
+        new RMDelegationTokenIdentifier(userText1, new Text("renewer1"),
+          userText1);
+    final Token<RMDelegationTokenIdentifier> token1 =
+        new Token<RMDelegationTokenIdentifier>(dtId1.getBytes(),
+          "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText1, token1);
+    this.context.getSystemCredentialsForApps().put(application1, credentials);
+
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
+
+    final UserGroupInformation ugi =
+        ((AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
+          .get(application1)).getUgi();
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        boolean hasNewToken = false;
+        for (Token<?> token : ugi.getCredentials().getAllTokens()) {
+          if (token.equals(token1)) {
+            hasNewToken = true;
+          }
+        }
+        return hasNewToken;
+      }
+    }, 1000, 20000);
+    logAggregationService.stop();
+    dispatcher.stop();
+  }
+
   private int numOfLogsAvailable(LogAggregationService logAggregationService,
       ApplicationId appId, boolean sizeLimited, String lastLogFile)
       throws IOException {