You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/01 17:06:55 UTC
[39/50] hadoop git commit: YARN-8418. App local logs could leaked if
log aggregation fails to initialize for the app. (Bibin A Chundatt via
wangda)
YARN-8418. App local logs could leaked if log aggregation fails to initialize for the app. (Bibin A Chundatt via wangda)
Change-Id: I29a23ca4b219b48c92e7975cd44cddb8b0e04104
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b540bbf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b540bbf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b540bbf
Branch: refs/heads/HDFS-12943
Commit: 4b540bbfcf02d828052999215c6135603d98f5db
Parents: 8aa93a5
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Jul 31 12:07:51 2018 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Jul 31 12:08:00 2018 -0700
----------------------------------------------------------------------
.../LogAggregationFileController.java | 7 ++
.../nodemanager/NodeStatusUpdaterImpl.java | 1 +
.../containermanager/ContainerManager.java | 1 +
.../containermanager/ContainerManagerImpl.java | 13 ++-
.../logaggregation/AppLogAggregator.java | 8 ++
.../logaggregation/AppLogAggregatorImpl.java | 15 ++++
.../logaggregation/LogAggregationService.java | 83 ++++++++++++++++----
.../containermanager/loghandler/LogHandler.java | 7 ++
.../loghandler/NonAggregatingLogHandler.java | 9 +++
.../loghandler/event/LogHandlerEventType.java | 4 +-
.../event/LogHandlerTokenUpdatedEvent.java | 26 ++++++
.../nodemanager/DummyContainerManager.java | 7 ++
.../TestLogAggregationService.java | 34 +++++---
13 files changed, 187 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index b047b1c..6b3c9a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -43,11 +43,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
@@ -365,6 +368,10 @@ public abstract class LogAggregationFileController {
}
});
} catch (Exception e) {
+ if (e instanceof RemoteException) {
+ throw new YarnRuntimeException(((RemoteException) e)
+ .unwrapRemoteException(SecretManager.InvalidToken.class));
+ }
throw new YarnRuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 8154723..faf7adb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -1135,6 +1135,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context).setSystemCrendentialsForApps(
parseCredentials(systemCredentials));
+ context.getContainerManager().handleCredentialUpdate();
}
List<org.apache.hadoop.yarn.api.records.Container>
containersToUpdate = response.getContainersToUpdate();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
index 2aeb245..356c2e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
@@ -44,4 +44,5 @@ public interface ContainerManager extends ServiceStateChangeListener,
ContainerScheduler getContainerScheduler();
+ void handleCredentialUpdate();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index ce240bc..8b35258 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,7 +171,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -214,6 +214,7 @@ public class ContainerManagerImpl extends CompositeService implements
protected final AsyncDispatcher dispatcher;
private final DeletionService deletionService;
+ private LogHandler logHandler;
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
@@ -292,7 +293,7 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public void serviceInit(Configuration conf) throws Exception {
- LogHandler logHandler =
+ logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
@@ -1904,4 +1905,12 @@ public class ContainerManagerImpl extends CompositeService implements
public ContainerScheduler getContainerScheduler() {
return this.containerScheduler;
}
+
+ @Override
+ public void handleCredentialUpdate() {
+ Set<ApplicationId> invalidApps = logHandler.getInvalidTokenApps();
+ if (!invalidApps.isEmpty()) {
+ dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
index 0178699..93436fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
public interface AppLogAggregator extends Runnable {
@@ -29,4 +31,10 @@ public interface AppLogAggregator extends Runnable {
void finishLogAggregation();
void disableLogAggregation();
+
+ void enableLogAggregation();
+
+ boolean isAggregationEnabled();
+
+ UserGroupInformation updateCredentials(Credentials cred);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 6630ba6..04503ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -561,6 +561,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.logAggregationDisabled = true;
}
+ @Override
+ public void enableLogAggregation() {
+ this.logAggregationDisabled = false;
+ }
+
+ @Override
+ public boolean isAggregationEnabled() {
+ return !logAggregationDisabled;
+ }
+
@Private
@VisibleForTesting
// This is only used for testing.
@@ -643,6 +653,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return this.userUgi;
}
+ public UserGroupInformation updateCredentials(Credentials cred) {
+ this.userUgi.addCredentials(cred);
+ return userUgi;
+ }
+
@Private
@VisibleForTesting
public int getLogAggregationTimes() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index dcc165f..d8db967 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -20,10 +20,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.token.SecretManager;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -83,6 +88,9 @@ public class LogAggregationService extends AbstractService implements
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
+ // Holds applications whose aggregation is disable due to invalid Token
+ private final Set<ApplicationId> invalidTokenApps;
+
@VisibleForTesting
ExecutorService threadPool;
@@ -95,6 +103,7 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
+ this.invalidTokenApps = ConcurrentHashMap.newKeySet();
}
protected void serviceInit(Configuration conf) throws Exception {
@@ -224,8 +233,8 @@ public class LogAggregationService extends AbstractService implements
userUgi.addCredentials(credentials);
}
- LogAggregationFileController logAggregationFileController
- = getLogAggregationFileController(getConfig());
+ LogAggregationFileController logAggregationFileController =
+ getLogAggregationFileController(getConfig());
logAggregationFileController.verifyAndCreateRemoteLogDir();
// New application
final AppLogAggregator appLogAggregator =
@@ -245,14 +254,16 @@ public class LogAggregationService extends AbstractService implements
logAggregationFileController.createAppDir(user, appId, userUgi);
} catch (Exception e) {
appLogAggregator.disableLogAggregation();
+
+ // add to disabled aggregators if due to InvalidToken
+ if (e.getCause() instanceof SecretManager.InvalidToken) {
+ invalidTokenApps.add(appId);
+ }
if (!(e instanceof YarnRuntimeException)) {
appDirException = new YarnRuntimeException(e);
} else {
appDirException = (YarnRuntimeException)e;
}
- appLogAggregators.remove(appId);
- closeFileSystems(userUgi);
- throw appDirException;
}
// TODO Get the user configuration for the list of containers that need log
@@ -270,6 +281,10 @@ public class LogAggregationService extends AbstractService implements
}
};
this.threadPool.execute(aggregatorWrapper);
+
+ if (appDirException != null) {
+ throw appDirException;
+ }
}
protected void closeFileSystems(final UserGroupInformation userUgi) {
@@ -307,17 +322,20 @@ public class LogAggregationService extends AbstractService implements
// App is complete. Finish up any containers' pending log aggregation and
// close the application specific logFile.
-
- AppLogAggregator aggregator = this.appLogAggregators.get(appId);
- if (aggregator == null) {
- LOG.warn("Log aggregation is not initialized for " + appId
- + ", did it fail to start?");
- this.dispatcher.getEventHandler().handle(
- new ApplicationEvent(appId,
- ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
- return;
+ try {
+ AppLogAggregator aggregator = this.appLogAggregators.get(appId);
+ if (aggregator == null) {
+ LOG.warn("Log aggregation is not initialized for " + appId
+ + ", did it fail to start?");
+ this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
+ return;
+ }
+ aggregator.finishLogAggregation();
+ } finally {
+ // Remove invalid Token Apps
+ invalidTokenApps.remove(appId);
}
- aggregator.finishLogAggregation();
}
@Override
@@ -344,12 +362,47 @@ public class LogAggregationService extends AbstractService implements
(LogHandlerAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
+ case LOG_AGG_TOKEN_UPDATE:
+ checkAndEnableAppAggregators();
+ break;
default:
; // Ignore
}
}
+ private void checkAndEnableAppAggregators() {
+ for (ApplicationId appId : invalidTokenApps) {
+ try {
+ AppLogAggregator aggregator = appLogAggregators.get(appId);
+ if (aggregator != null) {
+ Credentials credentials =
+ context.getSystemCredentialsForApps().get(appId);
+ if (credentials != null) {
+ // Create the app dir again with
+ LogAggregationFileController logAggregationFileController =
+ getLogAggregationFileController(getConfig());
+ UserGroupInformation userUgi =
+ aggregator.updateCredentials(credentials);
+ logAggregationFileController
+ .createAppDir(userUgi.getShortUserName(), appId, userUgi);
+ aggregator.enableLogAggregation();
+ }
+ invalidTokenApps.remove(appId);
+ LOG.info("LogAggregation enabled for application {}", appId);
+ }
+ } catch (Exception e) {
+ //Ignore exception
+ LOG.warn("Enable aggregators failed {}", appId);
+ }
+ }
+ }
+
+ @Override
+ public Set<ApplicationId> getInvalidTokenApps() {
+ return invalidTokenApps;
+ }
+
@VisibleForTesting
public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
return this.appLogAggregators;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
index 6eb3fb4..459fdf4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
@@ -18,9 +18,16 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+
+
+import java.util.Set;
+
public interface LogHandler extends EventHandler<LogHandlerEvent> {
public void handle(LogHandlerEvent event);
+
+ public Set<ApplicationId> getInvalidTokenApps();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 9c43dde..d66aa12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -204,6 +208,11 @@ public class NonAggregatingLogHandler extends AbstractService implements
}
}
+ @Override
+ public Set<ApplicationId> getInvalidTokenApps() {
+ return Collections.emptySet();
+ }
+
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
Configuration conf) {
ThreadFactory tf =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
index 684d6b2..ec477c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
@@ -19,5 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
public enum LogHandlerEventType {
- APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
+ APPLICATION_STARTED,
+ CONTAINER_FINISHED,
+ APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java
new file mode 100644
index 0000000..772a463
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
+
+public class LogHandlerTokenUpdatedEvent extends LogHandlerEvent {
+
+ public LogHandlerTokenUpdatedEvent() {
+ super(LogHandlerEventType.LOG_AGG_TOKEN_UPDATE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index b5cb43b..feabeb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -187,6 +189,11 @@ public class DummyContainerManager extends ContainerManagerImpl {
// Ignore
}
}
+
+ @Override
+ public Set<ApplicationId> getInvalidTokenApps() {
+ return Collections.emptySet();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 6268ad9..8b2e3cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -128,6 +129,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Tes
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -823,7 +825,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
.getFileControllerForWrite();
LogAggregationFileController spyLogAggregationFileFormat =
spy(logAggregationFileFormat);
- Exception e = new RuntimeException("KABOOM!");
+ Exception e =
+ new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!"));
doThrow(e).when(spyLogAggregationFileFormat)
.createAppDir(any(String.class), any(ApplicationId.class),
any(UserGroupInformation.class));
@@ -862,29 +865,40 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
};
checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic");
-
+ Assert.assertEquals(logAggregationService.getInvalidTokenApps().size(), 1);
// verify trying to collect logs for containers/apps we don't know about
// doesn't blow up and tear down the NM
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
BuilderUtils.newContainerId(4, 1, 1, 1),
ContainerType.APPLICATION_MASTER, 0));
dispatcher.await();
+
+ AppLogAggregator appAgg =
+ logAggregationService.getAppLogAggregators().get(appId);
+ Assert.assertFalse("Aggregation should be disabled",
+ appAgg.isAggregationEnabled());
+
+ // Enabled aggregation
+ logAggregationService.handle(new LogHandlerTokenUpdatedEvent());
+ dispatcher.await();
+
+ appAgg =
+ logAggregationService.getAppLogAggregators().get(appId);
+ Assert.assertFalse("Aggregation should be enabled",
+ appAgg.isAggregationEnabled());
+
+ // Check disabled apps are cleared
+ Assert.assertEquals(0, logAggregationService.getInvalidTokenApps().size());
+
logAggregationService.handle(new LogHandlerAppFinishedEvent(
BuilderUtils.newApplicationId(1, 5)));
dispatcher.await();
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
- // local log dir shouldn't be deleted given log aggregation cannot
- // continue due to aggregated log dir creation failure on remoteFS.
- FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user,
- null, null);
- verify(spyDelSrvc, never()).delete(deletionTask);
+ verify(spyDelSrvc).delete(any(FileDeletionTask.class));
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
- // make sure local log dir is not deleted in case log aggregation
- // service cannot be initiated.
- assertTrue(appLogDir.exists());
}
private void writeContainerLogs(File appLogDir, ContainerId containerId,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org