You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/11/30 00:28:20 UTC
svn commit: r1208135 [2/3] - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobcli...
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Nov 29 23:28:16 2011
@@ -57,7 +57,6 @@ import static org.apache.hadoop.fs.Creat
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
@@ -81,6 +79,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -125,19 +124,18 @@ public class ResourceLocalizationService
private InetSocketAddress localizationServerAddress;
private long cacheTargetSize;
private long cacheCleanupPeriod;
- private List<Path> logDirs;
- private List<Path> localDirs;
- private List<Path> sysDirs;
+
private final ContainerExecutor exec;
protected final Dispatcher dispatcher;
private final DeletionService delService;
private LocalizerTracker localizerTracker;
private RecordFactory recordFactory;
- private final LocalDirAllocator localDirsSelector;
private final ScheduledExecutorService cacheCleanup;
private final LocalResourcesTracker publicRsrc;
-
+
+ private LocalDirsHandlerService dirsHandler;
+
/**
* Map of LocalResourceTrackers keyed by username, for private
* resources.
@@ -153,12 +151,15 @@ public class ResourceLocalizationService
new ConcurrentHashMap<String,LocalResourcesTracker>();
public ResourceLocalizationService(Dispatcher dispatcher,
- ContainerExecutor exec, DeletionService delService) {
+ ContainerExecutor exec, DeletionService delService,
+ LocalDirsHandlerService dirsHandler) {
+
super(ResourceLocalizationService.class.getName());
this.exec = exec;
this.dispatcher = dispatcher;
this.delService = delService;
- this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
+ this.dirsHandler = dirsHandler;
+
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
@@ -177,41 +178,31 @@ public class ResourceLocalizationService
@Override
public void init(Configuration conf) {
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
+
try {
// TODO queue deletions here, rather than NM init?
FileContext lfs = getLocalFileContext(conf);
- String[] sLocalDirs =
- conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
-
- localDirs = new ArrayList<Path>(sLocalDirs.length);
- logDirs = new ArrayList<Path>(sLocalDirs.length);
- sysDirs = new ArrayList<Path>(sLocalDirs.length);
- for (String sLocaldir : sLocalDirs) {
- Path localdir = new Path(sLocaldir);
- localDirs.add(localdir);
+ List<String> localDirs = dirsHandler.getLocalDirs();
+ for (String localDir : localDirs) {
// $local/usercache
- Path userdir = new Path(localdir, ContainerLocalizer.USERCACHE);
- lfs.mkdir(userdir, null, true);
+ Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
+ lfs.mkdir(userDir, null, true);
// $local/filecache
- Path filedir = new Path(localdir, ContainerLocalizer.FILECACHE);
- lfs.mkdir(filedir, null, true);
+ Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
+ lfs.mkdir(fileDir, null, true);
// $local/nmPrivate
- Path sysdir = new Path(localdir, NM_PRIVATE_DIR);
- lfs.mkdir(sysdir, NM_PRIVATE_PERM, true);
- sysDirs.add(sysdir);
- }
- String[] sLogdirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
- for (String sLogdir : sLogdirs) {
- Path logdir = new Path(sLogdir);
- logDirs.add(logdir);
- lfs.mkdir(logdir, null, true);
+ Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+ lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
+ }
+
+ List<String> logDirs = dirsHandler.getLogDirs();
+ for (String logDir : logDirs) {
+ lfs.mkdir(new Path(logDir), null, true);
}
} catch (IOException e) {
throw new YarnException("Failed to initialize LocalizationService", e);
}
- localDirs = Collections.unmodifiableList(localDirs);
- logDirs = Collections.unmodifiableList(logDirs);
- sysDirs = Collections.unmodifiableList(sysDirs);
+
cacheTargetSize =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
cacheCleanupPeriod =
@@ -391,7 +382,7 @@ public class ResourceLocalizationService
String containerIDStr = c.toString();
String appIDStr = ConverterUtils.toString(
c.getContainerID().getApplicationAttemptId().getApplicationId());
- for (Path localDir : localDirs) {
+ for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned container-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@@ -428,7 +419,7 @@ public class ResourceLocalizationService
// Delete the application directories
userName = application.getUser();
appIDStr = application.toString();
- for (Path localDir : localDirs) {
+ for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@@ -574,12 +565,9 @@ public class ResourceLocalizationService
class PublicLocalizer extends Thread {
- static final String PUBCACHE_CTXT = "public.cache.dirs";
-
final FileContext lfs;
final Configuration conf;
final ExecutorService threadPool;
- final LocalDirAllocator publicDirs;
final CompletionService<Path> queue;
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
// TODO hack to work around broken signaling
@@ -601,13 +589,23 @@ public class ResourceLocalizationService
this.conf = conf;
this.pending = pending;
this.attempts = attempts;
- String[] publicFilecache = new String[localDirs.size()];
- for (int i = 0, n = localDirs.size(); i < n; ++i) {
- publicFilecache[i] =
- new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
- }
- conf.setStrings(PUBCACHE_CTXT, publicFilecache);
- this.publicDirs = new LocalDirAllocator(PUBCACHE_CTXT);
+// List<String> localDirs = dirsHandler.getLocalDirs();
+// String[] publicFilecache = new String[localDirs.size()];
+// for (int i = 0, n = localDirs.size(); i < n; ++i) {
+// publicFilecache[i] =
+// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
+// }
+// conf.setStrings(PUBCACHE_CTXT, publicFilecache);
+
+// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
+// List<String> localDirs = dirsHandler.getLocalDirs();
+// String[] publicFilecache = new String[localDirs.size()];
+// int i = 0;
+// for (String localDir : localDirs) {
+// publicFilecache[i++] =
+// new Path(localDir, ContainerLocalizer.FILECACHE).toString();
+// }
+
this.threadPool = threadPool;
this.queue = new ExecutorCompletionService<Path>(threadPool);
}
@@ -619,11 +617,19 @@ public class ResourceLocalizationService
synchronized (attempts) {
List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
if (null == sigh) {
- pending.put(queue.submit(new FSDownload(
- lfs, null, conf, publicDirs,
- request.getResource().getRequest(), new Random())),
- request);
- attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+ LocalResource resource = request.getResource().getRequest();
+ try {
+ Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
+ "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
+ ContainerLocalizer.getEstimatedSize(resource), true);
+ pending.put(queue.submit(new FSDownload(
+ lfs, null, conf, publicDirDestPath, resource, new Random())),
+ request);
+ attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+ } catch (IOException e) {
+ LOG.error("Local path for public localization is not found. "
+ + " May be disks failed.", e);
+ }
} else {
sigh.add(request);
}
@@ -844,24 +850,30 @@ public class ResourceLocalizationService
public void run() {
Path nmPrivateCTokensPath = null;
try {
- // Use LocalDirAllocator to get nmPrivateDir
+ // Get nmPrivateDir
nmPrivateCTokensPath =
- localDirsSelector.getLocalPathForWrite(
- NM_PRIVATE_DIR
- + Path.SEPARATOR
+ dirsHandler.getLocalPathForWrite(
+ NM_PRIVATE_DIR + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
- localizerId), getConfig());
+ localizerId));
// 0) init queue, etc.
// 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
- exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
- context.getUser(),
- ConverterUtils.toString(
- context.getContainerId().
- getApplicationAttemptId().getApplicationId()),
- localizerId, localDirs);
+ List<String> localDirs = dirsHandler.getLocalDirs();
+ List<String> logDirs = dirsHandler.getLogDirs();
+ if (dirsHandler.areDisksHealthy()) {
+ exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
+ context.getUser(),
+ ConverterUtils.toString(
+ context.getContainerId().
+ getApplicationAttemptId().getApplicationId()),
+ localizerId, localDirs, logDirs);
+ } else {
+ throw new IOException("All disks failed. "
+ + dirsHandler.getDisksHealthReport());
+ }
// TODO handle ExitCodeException separately?
} catch (Exception e) {
LOG.info("Localizer failed", e);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Tue Nov 29 23:28:16 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -31,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -40,10 +42,12 @@ import org.apache.hadoop.yarn.logaggrega
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.util.ConverterUtils;
+
public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory
@@ -51,6 +55,7 @@ public class AppLogAggregatorImpl implem
private static final int THREAD_SLEEP_TIME = 1000;
private static final String TMP_FILE_SUFFIX = ".tmp";
+ private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
private final ApplicationId appId;
private final String applicationId;
@@ -58,7 +63,6 @@ public class AppLogAggregatorImpl implem
private final Configuration conf;
private final DeletionService delService;
private final UserGroupInformation userUgi;
- private final String[] rootLogDirs;
private final Path remoteNodeLogFileForApp;
private final Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
@@ -72,7 +76,7 @@ public class AppLogAggregatorImpl implem
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf, ApplicationId appId,
- UserGroupInformation userUgi, String[] localRootLogDirs,
+ UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler,
Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
@@ -82,7 +86,7 @@ public class AppLogAggregatorImpl implem
this.appId = appId;
this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
- this.rootLogDirs = localRootLogDirs;
+ this.dirsHandler = dirsHandler;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
this.retentionPolicy = retentionPolicy;
@@ -115,9 +119,11 @@ public class AppLogAggregatorImpl implem
}
}
- LOG.info("Uploading logs for container " + containerId);
+ LOG.info("Uploading logs for container " + containerId
+ + ". Current good log dirs are "
+ + StringUtils.join(",", dirsHandler.getLogDirs()));
LogKey logKey = new LogKey(containerId);
- LogValue logValue = new LogValue(this.rootLogDirs, containerId);
+ LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId);
try {
this.writer.append(logKey, logValue);
} catch (IOException e) {
@@ -150,9 +156,10 @@ public class AppLogAggregatorImpl implem
}
// Remove the local app-log-dirs
- Path[] localAppLogDirs = new Path[this.rootLogDirs.length];
+ List<String> rootLogDirs = dirsHandler.getLogDirs();
+ Path[] localAppLogDirs = new Path[rootLogDirs.size()];
int index = 0;
- for (String rootLogDir : this.rootLogDirs) {
+ for (String rootLogDir : rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
index++;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Nov 29 23:28:16 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.logaggrega
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -85,7 +86,7 @@ public class LogAggregationService exten
private final DeletionService deletionService;
private final Dispatcher dispatcher;
- private String[] localRootLogDirs;
+ private LocalDirsHandlerService dirsHandler;
Path remoteRootLogDir;
String remoteRootLogDirSuffix;
private NodeId nodeId;
@@ -95,11 +96,12 @@ public class LogAggregationService exten
private final ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
- DeletionService deletionService) {
+ DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
super(LogAggregationService.class.getName());
this.dispatcher = dispatcher;
this.context = context;
this.deletionService = deletionService;
+ this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = Executors.newCachedThreadPool(
@@ -109,9 +111,6 @@ public class LogAggregationService exten
}
public synchronized void init(Configuration conf) {
- this.localRootLogDirs =
- conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
- YarnConfiguration.DEFAULT_NM_LOG_DIRS);
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -291,9 +290,10 @@ public class LogAggregationService exten
// New application
AppLogAggregator appLogAggregator =
- new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId,
- userUgi, this.localRootLogDirs,
- getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls);
+ new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
+ getConfig(), appId, userUgi, dirsHandler,
+ getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
+ appAcls);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java Tue Nov 29 23:28:16 2011
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
@@ -53,15 +55,16 @@ public class NonAggregatingLogHandler ex
private final DeletionService delService;
private final Map<ApplicationId, String> appOwners;
- private String[] rootLogDirs;
+ private final LocalDirsHandlerService dirsHandler;
private long deleteDelaySeconds;
private ScheduledThreadPoolExecutor sched;
public NonAggregatingLogHandler(Dispatcher dispatcher,
- DeletionService delService) {
+ DeletionService delService, LocalDirsHandlerService dirsHandler) {
super(NonAggregatingLogHandler.class.getName());
this.dispatcher = dispatcher;
this.delService = delService;
+ this.dirsHandler = dirsHandler;
this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
}
@@ -70,9 +73,6 @@ public class NonAggregatingLogHandler ex
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
- this.rootLogDirs =
- conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
- YarnConfiguration.DEFAULT_NM_LOG_DIRS);
sched = createScheduledThreadPoolExecutor(conf);
super.init(conf);
}
@@ -145,10 +145,11 @@ public class NonAggregatingLogHandler ex
@Override
@SuppressWarnings("unchecked")
public void run() {
- Path[] localAppLogDirs =
- new Path[NonAggregatingLogHandler.this.rootLogDirs.length];
+ List<String> rootLogDirs =
+ NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
+ Path[] localAppLogDirs = new Path[rootLogDirs.size()];
int index = 0;
- for (String rootLogDir : NonAggregatingLogHandler.this.rootLogDirs) {
+ for (String rootLogDir : rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
index++;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Tue Nov 29 23:28:16 2011
@@ -34,15 +34,14 @@ import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -87,17 +86,18 @@ public class ContainerLogsPage extends N
public static class ContainersLogsBlock extends HtmlBlock implements
YarnWebParams {
private final Configuration conf;
- private final LocalDirAllocator logsSelector;
private final Context nmContext;
private final ApplicationACLsManager aclsManager;
+ private final LocalDirsHandlerService dirsHandler;
@Inject
public ContainersLogsBlock(Configuration conf, Context context,
- ApplicationACLsManager aclsManager) {
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
this.conf = conf;
- this.logsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
this.nmContext = context;
this.aclsManager = aclsManager;
+ this.dirsHandler = dirsHandler;
}
@Override
@@ -198,11 +198,10 @@ public class ContainerLogsPage extends N
File logFile = null;
try {
logFile =
- new File(this.logsSelector
- .getLocalPathToRead(
- ContainerLaunch.getRelativeContainerLogDir(
- applicationId.toString(), containerId.toString())
- + Path.SEPARATOR + $(CONTAINER_LOG_TYPE), this.conf)
+ new File(this.dirsHandler.getLogPathToRead(
+ ContainerLaunch.getRelativeContainerLogDir(
+ applicationId.toString(), containerId.toString())
+ + Path.SEPARATOR + $(CONTAINER_LOG_TYPE))
.toUri().getPath());
} catch (Exception e) {
html.h1("Cannot find this log on the local disk.");
@@ -272,8 +271,8 @@ public class ContainerLogsPage extends N
}
} else {
// Just print out the log-types
- List<File> containerLogsDirs =
- getContainerLogDirs(this.conf, containerId);
+ List<File> containerLogsDirs = getContainerLogDirs(containerId,
+ dirsHandler);
boolean foundLogFile = false;
for (File containerLogsDir : containerLogsDirs) {
for (File logFile : containerLogsDir.listFiles()) {
@@ -293,11 +292,10 @@ public class ContainerLogsPage extends N
return;
}
- static List<File>
- getContainerLogDirs(Configuration conf, ContainerId containerId) {
- String[] logDirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
- YarnConfiguration.DEFAULT_NM_LOG_DIRS);
- List<File> containerLogDirs = new ArrayList<File>(logDirs.length);
+ static List<File> getContainerLogDirs(ContainerId containerId,
+ LocalDirsHandlerService dirsHandler) {
+ List<String> logDirs = dirsHandler.getLogDirs();
+ List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
String appIdStr =
ConverterUtils.toString(
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Tue Nov 29 23:28:16 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -42,10 +43,11 @@ public class WebServer extends AbstractS
private WebApp webApp;
public WebServer(Context nmContext, ResourceView resourceView,
- ApplicationACLsManager aclsManager) {
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
super(WebServer.class.getName());
this.nmContext = nmContext;
- this.nmWebApp = new NMWebApp(resourceView, aclsManager);
+ this.nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
}
@Override
@@ -81,17 +83,21 @@ public class WebServer extends AbstractS
private final ResourceView resourceView;
private final ApplicationACLsManager aclsManager;
+ private final LocalDirsHandlerService dirsHandler;
public NMWebApp(ResourceView resourceView,
- ApplicationACLsManager aclsManager) {
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
this.resourceView = resourceView;
this.aclsManager = aclsManager;
+ this.dirsHandler = dirsHandler;
}
@Override
public void setup() {
bind(ResourceView.class).toInstance(this.resourceView);
bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
+ bind(LocalDirsHandlerService.class).toInstance(dirsHandler);
route("/", NMController.class, "info");
route("/node", NMController.class, "node");
route("/allApplications", NMController.class, "allApplications");
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c Tue Nov 29 23:28:16 2011
@@ -261,8 +261,15 @@ char * get_value(const char* key) {
* Value delimiter is assumed to be a comma.
*/
char ** get_values(const char * key) {
- char ** toPass = NULL;
char *value = get_value(key);
+ return extract_values(value);
+}
+
+/**
+ * Extracts array of values from the comma separated list of values.
+ */
+char ** extract_values(char *value) {
+ char ** toPass = NULL;
char *tempTok = NULL;
char *tempstr = NULL;
int size = 0;
@@ -276,8 +283,7 @@ char ** get_values(const char * key) {
toPass[size++] = tempTok;
if(size == toPassSize) {
toPassSize += MAX_SIZE;
- toPass = (char **) realloc(toPass,(sizeof(char *) *
- (MAX_SIZE * toPassSize)));
+ toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
}
tempTok = strtok_r(NULL, ",", &tempstr);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h Tue Nov 29 23:28:16 2011
@@ -34,6 +34,9 @@ char *get_value(const char* key);
//comma seperated strings.
char ** get_values(const char* key);
+// Extracts array of values from the comma separated list of values.
+char ** extract_values(char *value);
+
// free the memory returned by get_values
void free_values(char** values);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Tue Nov 29 23:28:16 2011
@@ -357,7 +357,7 @@ int mkdirs(const char* path, mode_t perm
* It creates the container work and log directories.
*/
static int create_container_directories(const char* user, const char *app_id,
- const char *container_id) {
+ const char *container_id, char* const* local_dir, char* const* log_dir) {
// create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (app_id == NULL || container_id == NULL || user == NULL) {
@@ -367,20 +367,11 @@ static int create_container_directories(
}
int result = -1;
-
- char **local_dir = get_values(NM_SYS_DIR_KEY);
-
- if (local_dir == NULL) {
- fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
- return -1;
- }
-
- char **local_dir_ptr;
+ char* const* local_dir_ptr;
for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
char *container_dir = get_container_work_directory(*local_dir_ptr, user, app_id,
container_id);
if (container_dir == NULL) {
- free_values(local_dir);
return -1;
}
if (mkdirs(container_dir, perms) == 0) {
@@ -390,7 +381,6 @@ static int create_container_directories(
free(container_dir);
}
- free_values(local_dir);
if (result != 0) {
return result;
}
@@ -404,19 +394,11 @@ static int create_container_directories(
} else {
sprintf(combined_name, "%s/%s", app_id, container_id);
- char **log_dir = get_values(NM_LOG_DIR_KEY);
- if (log_dir == NULL) {
- free(combined_name);
- fprintf(LOGFILE, "%s is not configured.\n", NM_LOG_DIR_KEY);
- return -1;
- }
-
- char **log_dir_ptr;
+ char* const* log_dir_ptr;
for(log_dir_ptr = log_dir; *log_dir_ptr != NULL; ++log_dir_ptr) {
char *container_log_dir = get_app_log_directory(*log_dir_ptr, combined_name);
if (container_log_dir == NULL) {
free(combined_name);
- free_values(log_dir);
return -1;
} else if (mkdirs(container_log_dir, perms) != 0) {
free(container_log_dir);
@@ -426,7 +408,6 @@ static int create_container_directories(
}
}
free(combined_name);
- free_values(log_dir);
}
return result;
}
@@ -660,17 +641,12 @@ static int copy_file(int input, const ch
/**
* Function to initialize the user directories of a user.
*/
-int initialize_user(const char *user) {
- char **local_dir = get_values(NM_SYS_DIR_KEY);
- if (local_dir == NULL) {
- fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
- return INVALID_NM_ROOT_DIRS;
- }
+int initialize_user(const char *user, char* const* local_dirs) {
char *user_dir;
- char **local_dir_ptr = local_dir;
+ char* const* local_dir_ptr;
int failed = 0;
- for(local_dir_ptr = local_dir; *local_dir_ptr != 0; ++local_dir_ptr) {
+ for(local_dir_ptr = local_dirs; *local_dir_ptr != 0; ++local_dir_ptr) {
user_dir = get_user_directory(*local_dir_ptr, user);
if (user_dir == NULL) {
fprintf(LOGFILE, "Couldn't get userdir directory for %s.\n", user);
@@ -682,32 +658,29 @@ int initialize_user(const char *user) {
}
free(user_dir);
}
- free_values(local_dir);
return failed ? INITIALIZE_USER_FAILED : 0;
}
/**
* Function to prepare the application directories for the container.
*/
-int initialize_app(const char *user, const char *app_id,
- const char* nmPrivate_credentials_file, char* const* args) {
+int initialize_app(const char *user, const char *app_id,
+ const char* nmPrivate_credentials_file,
+ char* const* local_dirs, char* const* log_roots,
+ char* const* args) {
if (app_id == NULL || user == NULL) {
fprintf(LOGFILE, "Either app_id is null or the user passed is null.\n");
return INVALID_ARGUMENT_NUMBER;
}
// create the user directory on all disks
- int result = initialize_user(user);
+ int result = initialize_user(user, local_dirs);
if (result != 0) {
return result;
}
////////////// create the log directories for the app on all disks
- char **log_roots = get_values(NM_LOG_DIR_KEY);
- if (log_roots == NULL) {
- return INVALID_CONFIG_FILE;
- }
- char **log_root;
+ char* const* log_root;
char *any_one_app_log_dir = NULL;
for(log_root=log_roots; *log_root != NULL; ++log_root) {
char *app_log_dir = get_app_log_directory(*log_root, app_id);
@@ -722,7 +695,7 @@ int initialize_app(const char *user, con
free(app_log_dir);
}
}
- free_values(log_roots);
+
if (any_one_app_log_dir == NULL) {
fprintf(LOGFILE, "Did not create any app-log directories\n");
return -1;
@@ -743,15 +716,9 @@ int initialize_app(const char *user, con
// 750
mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP;
- char **nm_roots = get_values(NM_SYS_DIR_KEY);
-
- if (nm_roots == NULL) {
- return INVALID_CONFIG_FILE;
- }
-
- char **nm_root;
+ char* const* nm_root;
char *primary_app_dir = NULL;
- for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
+ for(nm_root=local_dirs; *nm_root != NULL; ++nm_root) {
char *app_dir = get_app_directory(*nm_root, user, app_id);
if (app_dir == NULL) {
// try the next one
@@ -763,7 +730,7 @@ int initialize_app(const char *user, con
free(app_dir);
}
}
- free_values(nm_roots);
+
if (primary_app_dir == NULL) {
fprintf(LOGFILE, "Did not create any app directories\n");
return -1;
@@ -805,9 +772,10 @@ int initialize_app(const char *user, con
}
int launch_container_as_user(const char *user, const char *app_id,
- const char *container_id, const char *work_dir,
- const char *script_name, const char *cred_file,
- const char* pid_file) {
+ const char *container_id, const char *work_dir,
+ const char *script_name, const char *cred_file,
+ const char* pid_file, char* const* local_dirs,
+ char* const* log_dirs) {
int exit_code = -1;
char *script_file_dest = NULL;
char *cred_file_dest = NULL;
@@ -854,7 +822,8 @@ int launch_container_as_user(const char
goto cleanup;
}
- if (create_container_directories(user, app_id, container_id) != 0) {
+ if (create_container_directories(user, app_id, container_id, local_dirs,
+ log_dirs) != 0) {
fprintf(LOGFILE, "Could not create container dirs");
goto cleanup;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h Tue Nov 29 23:28:16 2011
@@ -61,8 +61,6 @@ enum errorcodes {
#define NM_APP_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s"
#define CONTAINER_DIR_PATTERN NM_APP_DIR_PATTERN "/%s"
#define CONTAINER_SCRIPT "launch_container.sh"
-#define NM_SYS_DIR_KEY "yarn.nodemanager.local-dirs"
-#define NM_LOG_DIR_KEY "yarn.nodemanager.log-dirs"
#define CREDENTIALS_FILENAME "container_tokens"
#define MIN_USERID_KEY "min.user.id"
#define BANNED_USERS_KEY "banned.users"
@@ -92,12 +90,13 @@ int check_executor_permissions(char *exe
// initialize the application directory
int initialize_app(const char *user, const char *app_id,
- const char *credentials, char* const* args);
+ const char *credentials, char* const* local_dirs,
+ char* const* log_dirs, char* const* args);
/*
* Function used to launch a container as the provided user. It does the following :
* 1) Creates container work dir and log dir to be accessible by the child
- * 2) Copies the script file from the TT to the work directory
+ * 2) Copies the script file from the NM to the work directory
* 3) Sets up the environment
* 4) Does an execlp on the same in order to replace the current image with
* container image.
@@ -109,12 +108,15 @@ int initialize_app(const char *user, con
* @param cred_file the credentials file that needs to be compied to the
* working directory.
* @param pid_file file where pid of process should be written to
+ * @param local_dirs nodemanager-local-directories to be used
+ * @param log_dirs nodemanager-log-directories to be used
* @return -1 or errorcode enum value on error (should never return on success).
*/
int launch_container_as_user(const char * user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
- const char *pid_file);
+ const char *pid_file, char* const* local_dirs,
+ char* const* log_dirs);
/**
* Function used to signal a container launched by the user.
@@ -181,7 +183,7 @@ int mkdirs(const char* path, mode_t perm
/**
* Function to initialize the user directories of a user.
*/
-int initialize_user(const char *user);
+int initialize_user(const char *user, char* const* local_dirs);
/**
* Create a top level directory for the user.
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c Tue Nov 29 23:28:16 2011
@@ -43,10 +43,11 @@ void display_usage(FILE *stream) {
fprintf(stream,
"Usage: container-executor user command command-args\n");
fprintf(stream, "Commands:\n");
- fprintf(stream, " initialize container: %2d appid tokens cmd app...\n",
- INITIALIZE_CONTAINER);
+ fprintf(stream, " initialize container: %2d appid tokens " \
+ "nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
fprintf(stream,
- " launch container: %2d appid containerid workdir container-script tokens pidfile\n",
+ " launch container: %2d appid containerid workdir "\
+ "container-script tokens pidfile nm-local-dirs nm-log-dirs\n",
LAUNCH_CONTAINER);
fprintf(stream, " signal container: %2d container-pid signal\n",
SIGNAL_CONTAINER);
@@ -96,6 +97,7 @@ int main(int argc, char **argv) {
char *orig_conf_file = STRINGIFY(HADOOP_CONF_DIR) "/" CONF_FILENAME;
char *conf_file = realpath(orig_conf_file, NULL);
+ char *local_dirs, *log_dirs;
if (conf_file == NULL) {
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
@@ -158,20 +160,23 @@ int main(int argc, char **argv) {
switch (command) {
case INITIALIZE_CONTAINER:
- if (argc < 6) {
- fprintf(ERRORFILE, "Too few arguments (%d vs 6) for initialize container\n",
+ if (argc < 8) {
+ fprintf(ERRORFILE, "Too few arguments (%d vs 8) for initialize container\n",
argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
app_id = argv[optind++];
cred_file = argv[optind++];
+ local_dirs = argv[optind++];// good local dirs as a comma separated list
+ log_dirs = argv[optind++];// good log dirs as a comma separated list
exit_code = initialize_app(user_detail->pw_name, app_id, cred_file,
- argv + optind);
+ extract_values(local_dirs),
+ extract_values(log_dirs), argv + optind);
break;
case LAUNCH_CONTAINER:
- if (argc < 9) {
- fprintf(ERRORFILE, "Too few arguments (%d vs 9) for launch container\n",
+ if (argc != 11) {
+ fprintf(ERRORFILE, "Too few arguments (%d vs 11) for launch container\n",
argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
@@ -182,13 +187,17 @@ int main(int argc, char **argv) {
script_file = argv[optind++];
cred_file = argv[optind++];
pid_file = argv[optind++];
- exit_code = launch_container_as_user(user_detail->pw_name, app_id, container_id,
- current_dir, script_file, cred_file, pid_file);
+ local_dirs = argv[optind++];// good local dirs as a comma separated list
+ log_dirs = argv[optind++];// good log dirs as a comma separated list
+ exit_code = launch_container_as_user(user_detail->pw_name, app_id,
+ container_id, current_dir, script_file, cred_file,
+ pid_file, extract_values(local_dirs),
+ extract_values(log_dirs));
break;
case SIGNAL_CONTAINER:
- if (argc < 5) {
- fprintf(ERRORFILE, "Too few arguments (%d vs 5) for signal container\n",
- argc);
+ if (argc != 5) {
+ fprintf(ERRORFILE, "Wrong number of arguments (%d vs 5) for " \
+ "signal container\n", argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
} else {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c Tue Nov 29 23:28:16 2011
@@ -28,10 +28,17 @@
#include <sys/stat.h>
#include <sys/wait.h>
-#define TEST_ROOT "/tmp/test-container-controller"
+#define TEST_ROOT "/tmp/test-container-executor"
#define DONT_TOUCH_FILE "dont-touch-me"
+#define NM_LOCAL_DIRS TEST_ROOT "/local-1," TEST_ROOT "/local-2," \
+ TEST_ROOT "/local-3," TEST_ROOT "/local-4," TEST_ROOT "/local-5"
+#define NM_LOG_DIRS TEST_ROOT "/logdir_1," TEST_ROOT "/logdir_2," \
+ TEST_ROOT "/logdir_3," TEST_ROOT "/logdir_4"
+#define ARRAY_SIZE 1000
static char* username = NULL;
+static char* local_dirs = NULL;
+static char* log_dirs = NULL;
/**
* Run the command using the effective user id.
@@ -84,40 +91,33 @@ void run(const char *cmd) {
int write_config_file(char *file_name) {
FILE *file;
- int i = 0;
file = fopen(file_name, "w");
if (file == NULL) {
printf("Failed to open %s.\n", file_name);
return EXIT_FAILURE;
}
- fprintf(file, "yarn.nodemanager.local-dirs=" TEST_ROOT "/local-1");
- for(i=2; i < 5; ++i) {
- fprintf(file, "," TEST_ROOT "/local-%d", i);
- }
- fprintf(file, "\n");
- fprintf(file, "yarn.nodemanager.log-dirs=" TEST_ROOT "/logs\n");
+ fprintf(file, "banned.users=bannedUser\n");
+ fprintf(file, "min.user.id=1000\n");
fclose(file);
return 0;
}
-void create_nm_roots() {
- char** nm_roots = get_values(NM_SYS_DIR_KEY);
+void create_nm_roots(char ** nm_roots) {
char** nm_root;
for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
if (mkdir(*nm_root, 0755) != 0) {
printf("FAIL: Can't create directory %s - %s\n", *nm_root,
- strerror(errno));
+ strerror(errno));
exit(1);
}
char buffer[100000];
sprintf(buffer, "%s/usercache", *nm_root);
if (mkdir(buffer, 0755) != 0) {
printf("FAIL: Can't create directory %s - %s\n", buffer,
- strerror(errno));
+ strerror(errno));
exit(1);
}
}
- free_values(nm_roots);
}
void test_get_user_directory() {
@@ -209,7 +209,7 @@ void test_check_configuration_permission
}
void test_delete_container() {
- if (initialize_user(username)) {
+ if (initialize_user(username, extract_values(local_dirs))) {
printf("FAIL: failed to initialize user %s\n", username);
exit(1);
}
@@ -504,7 +504,8 @@ void test_init_app() {
exit(1);
} else if (child == 0) {
char *final_pgm[] = {"touch", "my-touch-file", 0};
- if (initialize_app(username, "app_4", TEST_ROOT "/creds.txt", final_pgm) != 0) {
+ if (initialize_app(username, "app_4", TEST_ROOT "/creds.txt", final_pgm,
+ extract_values(local_dirs), extract_values(log_dirs)) != 0) {
printf("FAIL: failed in child\n");
exit(42);
}
@@ -598,7 +599,8 @@ void test_run_container() {
exit(1);
} else if (child == 0) {
if (launch_container_as_user(username, "app_4", "container_1",
- container_dir, script_name, TEST_ROOT "/creds.txt", pid_file) != 0) {
+ container_dir, script_name, TEST_ROOT "/creds.txt", pid_file,
+ extract_values(local_dirs), extract_values(log_dirs)) != 0) {
printf("FAIL: failed in child\n");
exit(42);
}
@@ -677,7 +679,12 @@ int main(int argc, char **argv) {
}
read_config(TEST_ROOT "/test.cfg");
- create_nm_roots();
+ local_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+ strcpy(local_dirs, NM_LOCAL_DIRS);
+ log_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+ strcpy(log_dirs, NM_LOG_DIRS);
+
+ create_nm_roots(extract_values(local_dirs));
if (getuid() == 0 && argc == 2) {
username = argv[1];
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Tue Nov 29 23:28:16 2011
@@ -60,16 +60,18 @@ public class DummyContainerManager exten
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics,
ContainerTokenSecretManager containerTokenSecretManager,
- ApplicationACLsManager applicationACLsManager) {
+ ApplicationACLsManager applicationACLsManager,
+ LocalDirsHandlerService dirsHandler) {
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
- containerTokenSecretManager, applicationACLsManager);
+ containerTokenSecretManager, applicationACLsManager, dirsHandler);
}
@Override
@SuppressWarnings("unchecked")
- protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
- DeletionService deletionContext) {
- return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
+ protected ResourceLocalizationService createResourceLocalizationService(
+ ContainerExecutor exec, DeletionService deletionContext) {
+ return new ResourceLocalizationService(super.dispatcher, exec,
+ deletionContext, super.dirsHandler) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {
@@ -125,7 +127,8 @@ public class DummyContainerManager exten
@SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
- return new ContainersLauncher(context, super.dispatcher, exec) {
+ return new ContainersLauncher(context, super.dispatcher, exec,
+ super.dirsHandler) {
@Override
public void handle(ContainersLauncherEvent event) {
Container container = event.getContainer();
@@ -139,7 +142,8 @@ public class DummyContainerManager exten
case CLEANUP_CONTAINER:
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
- ContainerEventType.CONTAINER_KILLED_ON_REQUEST, 0));
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST, 0,
+ "Container exited with exit code 0."));
break;
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Tue Nov 29 23:28:16 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.no
import java.io.File;
import java.io.IOException;
-import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -80,9 +79,12 @@ public class TestEventFlow {
ContainerExecutor exec = new DefaultContainerExecutor();
exec.setConf(conf);
+
DeletionService del = new DeletionService(exec);
Dispatcher dispatcher = new AsyncDispatcher();
- NodeHealthCheckerService healthChecker = null;
+ NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
+ healthChecker.init(conf);
+ LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
NodeManagerMetrics metrics = NodeManagerMetrics.create();
ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager();
NodeStatusUpdater nodeStatusUpdater =
@@ -100,7 +102,8 @@ public class TestEventFlow {
DummyContainerManager containerManager = new DummyContainerManager(
context, exec, del, nodeStatusUpdater, metrics,
- containerTokenSecretManager, new ApplicationACLsManager(conf));
+ containerTokenSecretManager, new ApplicationACLsManager(conf),
+ dirsHandler);
containerManager.init(conf);
containerManager.start();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java Tue Nov 29 23:28:16 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -63,8 +64,6 @@ import org.junit.Test;
* config values.
* <br><pre><code>
* > cat /etc/hadoop/container-executor.cfg
- * yarn.nodemanager.local-dirs=/tmp/hadoop/nm-local/
- * yarn.nodemanager.log-dirs=/tmp/hadoop/nm-log
* yarn.nodemanager.linux-container-executor.group=mapred
* #depending on the user id of the application.submitter option
* min.user.id=1
@@ -72,7 +71,7 @@ import org.junit.Test;
* > sudo chmod 444 /etc/hadoop/container-executor.cfg
* </code></pre>
*
- * <li>iMove the binary and set proper permissions on it. It needs to be owned
+ * <li>Move the binary and set proper permissions on it. It needs to be owned
* by root, the group needs to be the group configured in container-executor.cfg,
* and it needs the setuid bit set. (The build will also overwrite it so you
* need to move it to a place that you can support it.
@@ -98,14 +97,22 @@ public class TestLinuxContainerExecutor
private LinuxContainerExecutor exec = null;
private String appSubmitter = null;
+ private LocalDirsHandlerService dirsHandler;
@Before
public void setup() throws Exception {
- FileContext.getLocalFSFileContext().mkdir(
- new Path(workSpace.getAbsolutePath()), null, true);
+ FileContext files = FileContext.getLocalFSFileContext();
+ Path workSpacePath = new Path(workSpace.getAbsolutePath());
+ files.mkdir(workSpacePath, null, true);
workSpace.setReadable(true, false);
workSpace.setExecutable(true, false);
workSpace.setWritable(true, false);
+ File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
+ files.mkdir(new Path(localDir.getAbsolutePath()),
+ new FsPermission("777"), false);
+ File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
+ files.mkdir(new Path(logDir.getAbsolutePath()),
+ new FsPermission("777"), false);
String exec_path = System.getProperty("container-executor.path");
if(exec_path != null && !exec_path.isEmpty()) {
Configuration conf = new Configuration(false);
@@ -114,6 +121,10 @@ public class TestLinuxContainerExecutor
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
exec = new LinuxContainerExecutor();
exec.setConf(conf);
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
+ dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
}
appSubmitter = System.getProperty("application.submitter");
if(appSubmitter == null || appSubmitter.isEmpty()) {
@@ -189,7 +200,8 @@ public class TestLinuxContainerExecutor
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath,
- appSubmitter, appId, workDir);
+ appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
+ dirsHandler.getLogDirs());
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java Tue Nov 29 23:28:16 2011
@@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -51,6 +52,7 @@ public class TestLinuxContainerExecutorW
private LinuxContainerExecutor mockExec = null;
private final File mockParamFile = new File("./params.txt");
+ private LocalDirsHandlerService dirsHandler;
private void deleteMockParamFile() {
if(mockParamFile.exists()) {
@@ -80,6 +82,8 @@ public class TestLinuxContainerExecutorW
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
mockExec = new LinuxContainerExecutor();
+ dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
mockExec.setConf(conf);
}
@@ -114,10 +118,13 @@ public class TestLinuxContainerExecutorW
mockExec.activateContainer(cId, pidFile);
int ret = mockExec.launchContainer(container, scriptPath, tokensPath,
- appSubmitter, appId, workDir);
+ appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
+ dirsHandler.getLogDirs());
assertEquals(0, ret);
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
- workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString()),
+ workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
+ StringUtils.join(",", dirsHandler.getLocalDirs()),
+ StringUtils.join(",", dirsHandler.getLogDirs())),
readMockParams());
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Nov 29 23:28:16 2011
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentMa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
@@ -440,10 +439,11 @@ public class TestNodeStatusUpdater {
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ContainerTokenSecretManager containerTokenSecretManager,
- ApplicationACLsManager aclsManager) {
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService diskhandler) {
return new ContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, containerTokenSecretManager,
- aclsManager) {
+ aclsManager, diskhandler) {
@Override
public void start() {
// Simulating failure of starting RPC server
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Tue Nov 29 23:28:16 2011
@@ -45,7 +45,9 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
+import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
@@ -94,6 +96,8 @@ public abstract class BaseContainerManag
protected ContainerExecutor exec;
protected DeletionService delSrvc;
protected String user = "nobody";
+ protected NodeHealthCheckerService nodeHealthChecker;
+ protected LocalDirsHandlerService dirsHandler;
protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), null, metrics, this.containerTokenSecretManager) {
@@ -147,9 +151,12 @@ public abstract class BaseContainerManag
delSrvc.init(conf);
exec = createContainerExecutor();
+ nodeHealthChecker = new NodeHealthCheckerService();
+ nodeHealthChecker.init(conf);
+ dirsHandler = nodeHealthChecker.getDiskHandler();
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, this.containerTokenSecretManager,
- new ApplicationACLsManager(conf));
+ new ApplicationACLsManager(conf), dirsHandler);
containerManager.init(conf);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Tue Nov 29 23:28:16 2011
@@ -383,11 +383,12 @@ public class TestContainerManager extend
// Real del service
delSrvc = new DeletionService(exec);
delSrvc.init(conf);
+
ContainerTokenSecretManager containerTokenSecretManager = new
ContainerTokenSecretManager();
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, containerTokenSecretManager,
- new ApplicationACLsManager(conf));
+ new ApplicationACLsManager(conf), dirsHandler);
containerManager.init(conf);
containerManager.start();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Tue Nov 29 23:28:16 2011
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleEntry;
@@ -649,7 +650,8 @@ public class TestContainer {
public void containerFailed(int exitCode) {
c.handle(new ContainerExitEvent(cId,
- ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode));
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode,
+ "Container completed with exit code " + exitCode));
drainDispatcherEvents();
}
@@ -659,9 +661,10 @@ public class TestContainer {
}
public void containerKilledOnRequest() {
+ int exitCode = ExitCode.FORCE_KILLED.getExitCode();
c.handle(new ContainerExitEvent(cId,
- ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.FORCE_KILLED
- .getExitCode()));
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
+ "Container completed with exit code " + exitCode));
drainDispatcherEvents();
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Nov 29 23:28:16 2011
@@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.event.Drai
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -109,19 +111,23 @@ public class TestResourceLocalizationSer
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
+ diskhandler.init(conf);
+
ResourceLocalizationService locService =
- spy(new ResourceLocalizationService(dispatcher, exec, delService));
+ spy(new ResourceLocalizationService(dispatcher, exec, delService,
+ diskhandler));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
dispatcher.start();
- List<Path> localDirs = new ArrayList<Path>();
- String[] sDirs = new String[4];
- for (int i = 0; i < 4; ++i) {
- localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
- sDirs[i] = localDirs.get(i).toString();
- }
- conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// initialize ResourceLocalizationService
locService.init(conf);
@@ -176,12 +182,16 @@ public class TestResourceLocalizationSer
dispatcher.register(LocalizerEventType.class, localizerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
+
DeletionService delService = new DeletionService(exec);
delService.init(null);
delService.start();
ResourceLocalizationService rawService =
- new ResourceLocalizationService(dispatcher, exec, delService);
+ new ResourceLocalizationService(dispatcher, exec, delService,
+ dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -356,13 +366,17 @@ public class TestResourceLocalizationSer
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
+
DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
delService.init(null);
delService.start();
ResourceLocalizationService rawService =
- new ResourceLocalizationService(dispatcher, exec, delService);
+ new ResourceLocalizationService(dispatcher, exec, delService,
+ dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -414,8 +428,9 @@ public class TestResourceLocalizationSer
String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString();
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
- verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class),
- eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+ verify(exec).startLocalizer(tokenPathCaptor.capture(),
+ isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
+ isA(List.class), isA(List.class));
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Tue Nov 29 23:28:16 2011
@@ -122,7 +122,8 @@ public class TestLogAggregationService e
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
- new LogAggregationService(dispatcher, this.context, this.delSrvc);
+ new LogAggregationService(dispatcher, this.context, this.delSrvc,
+ super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
@@ -189,7 +190,8 @@ public class TestLogAggregationService e
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
- new LogAggregationService(dispatcher, this.context, this.delSrvc);
+ new LogAggregationService(dispatcher, this.context, this.delSrvc,
+ super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
@@ -237,7 +239,8 @@ public class TestLogAggregationService e
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
- new LogAggregationService(dispatcher, this.context, this.delSrvc);
+ new LogAggregationService(dispatcher, this.context, this.delSrvc,
+ super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java?rev=1208135&r1=1208134&r2=1208135&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java Tue Nov 29 23:28:16 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.event.Drai
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
@@ -74,13 +75,16 @@ public class TestNonAggregatingLogHandle
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
+
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler logHandler =
- new NonAggregatingLogHandler(dispatcher, delService);
+ new NonAggregatingLogHandler(dispatcher, delService, dirsHandler);
logHandler.init(conf);
logHandler.start();
@@ -146,13 +150,17 @@ public class TestNonAggregatingLogHandle
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
+
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler logHandler =
- new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService);
+ new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService,
+ dirsHandler);
logHandler.init(conf);
logHandler.start();
@@ -182,8 +190,8 @@ public class TestNonAggregatingLogHandle
private ScheduledThreadPoolExecutor mockSched;
public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
- DeletionService delService) {
- super(dispatcher, delService);
+ DeletionService delService, LocalDirsHandlerService dirsHandler) {
+ super(dispatcher, delService, dirsHandler);
}
@Override