You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/10 17:18:59 UTC
svn commit: r1101502 [2/2] - in /hadoop/mapreduce/branches/MR-279: ./
yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/
yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/
yarn/ya...
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue May 10 15:18:58 2011
@@ -54,6 +54,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -102,16 +104,16 @@ public class ResourceLocalizationService
public static final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700);
private Server server;
- private InetSocketAddress locAddr;
+ private InetSocketAddress localizationServerAddress;
private List<Path> logDirs;
private List<Path> localDirs;
private List<Path> sysDirs;
private final ContainerExecutor exec;
protected final Dispatcher dispatcher;
private final DeletionService delService;
- private LocalizerTracker localizers;
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
+ private LocalizerTracker localizerTracker;
+ private RecordFactory recordFactory;
+ private final LocalDirAllocator localDirsSelector;
//private final LocalResourcesTracker publicRsrc;
private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
@@ -125,6 +127,7 @@ public class ResourceLocalizationService
this.exec = exec;
this.dispatcher = dispatcher;
this.delService = delService;
+ this.localDirsSelector = new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
}
FileContext getLocalFileContext(Configuration conf) {
@@ -137,6 +140,7 @@ public class ResourceLocalizationService
@Override
public void init(Configuration conf) {
+ this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
// TODO queue deletions here, rather than NM init?
FileContext lfs = getLocalFileContext(conf);
@@ -172,16 +176,16 @@ public class ResourceLocalizationService
localDirs = Collections.unmodifiableList(localDirs);
logDirs = Collections.unmodifiableList(logDirs);
sysDirs = Collections.unmodifiableList(sysDirs);
- locAddr = NetUtils.createSocketAddr(
+ localizationServerAddress = NetUtils.createSocketAddr(
conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
- localizers = new LocalizerTracker();
- dispatcher.register(LocalizerEventType.class, localizers);
+ localizerTracker = new LocalizerTracker();
+ dispatcher.register(LocalizerEventType.class, localizerTracker);
super.init(conf);
}
@Override
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
- return localizers.processHeartbeat(status);
+ return localizerTracker.processHeartbeat(status);
}
@Override
@@ -202,8 +206,8 @@ public class ResourceLocalizationService
LocalizerSecurityInfo.class, SecurityInfo.class);
secretManager = new LocalizerTokenSecretManager();
}
- return rpc.getServer(
- LocalizationProtocol.class, this, locAddr, conf, secretManager);
+ return rpc.getServer(LocalizationProtocol.class, this,
+ localizationServerAddress, conf, secretManager);
}
@Override
@@ -211,8 +215,8 @@ public class ResourceLocalizationService
if (server != null) {
server.close();
}
- if (localizers != null) {
- localizers.stop();
+ if (localizerTracker != null) {
+ localizerTracker.stop();
}
super.stop();
}
@@ -233,7 +237,7 @@ public class ResourceLocalizationService
if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(dispatcher))) {
LOG.warn("Initializing application " + app + " already present");
- assert false;
+ assert false; // TODO: FIXME assert doesn't help
}
// 1) Signal container init
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
@@ -261,6 +265,8 @@ public class ResourceLocalizationService
appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
break;
}
+ // We get separate events one each for all resources of one visibility. So
+ // all the resources in this event are of the same visibility.
for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
}
@@ -270,24 +276,25 @@ public class ResourceLocalizationService
((ContainerLocalizationEvent)event).getContainer();
// Delete the container directories
- userName = container.getUser();;
+ userName = container.getUser();
String containerIDStr = container.toString();
appIDStr =
ConverterUtils.toString(container.getContainerID().getAppId());
for (Path localDir : localDirs) {
+
+ // Delete the user-owned container-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
- Path userdir =
- new Path(usersdir, userName);
+ Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
- Path containerDir =
- new Path(appDir, containerIDStr);
- delService.delete(userName, containerDir, null);
+ Path containerDir = new Path(appDir, containerIDStr);
+ delService.delete(userName, containerDir, new Path[] {});
+ // Delete the nmPrivate container-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
Path containerSysDir = new Path(appSysDir, containerIDStr);
- delService.delete(null, containerSysDir, null);
+ delService.delete(null, containerSysDir, new Path[] {});
}
dispatcher.getEventHandler().handle(new ContainerEvent(
@@ -298,24 +305,28 @@ public class ResourceLocalizationService
Application application =
((ApplicationLocalizationEvent) event).getApplication();
- if (null == appRsrc.remove(application)) {
+ LocalResourcesTracker appLocalRsrcsTracker = appRsrc.remove(application);
+ if (null == appLocalRsrcsTracker) {
LOG.warn("Removing uninitialized application " + application);
}
+ // TODO: What to do with appLocalRsrcsTracker?
// Delete the application directories
userName = application.getUser();
appIDStr = application.toString();
for (Path localDir : localDirs) {
+
+ // Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
- Path userdir =
- new Path(usersdir, userName);
+ Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
- delService.delete(userName, appDir, null);
+ delService.delete(userName, appDir, new Path[] {});
+ // Delete the nmPrivate app-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
- delService.delete(null, appSysDir, null);
+ delService.delete(null, appSysDir, new Path[] {});
}
// TODO: decrement reference counts of all resources associated with this
@@ -328,109 +339,114 @@ public class ResourceLocalizationService
}
}
+ /**
+ * Sub-component handling the spawning of {@link ContainerLocalizer}s
+ *
+ */
class LocalizerTracker implements EventHandler<LocalizerEvent> {
- private final Map<String,LocalizerRunner> trackers;
+ private final Map<String,LocalizerRunner> localizerRunners;
LocalizerTracker() {
this(new HashMap<String,LocalizerRunner>());
}
LocalizerTracker(Map<String,LocalizerRunner> trackers) {
- this.trackers = trackers;
+ this.localizerRunners = trackers;
}
public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
String locId = status.getLocalizerId();
- synchronized (trackers) {
- LocalizerRunner localizer = trackers.get(locId);
- if (null == localizer) {
+ synchronized (localizerRunners) {
+ LocalizerRunner localizerRunner = localizerRunners.get(locId);
+ if (null == localizerRunner) {
// TODO process resources anyway
+ LOG.info("Unknown localizer with localizerId " + locId
+ + " is sending heartbeat. Ordering it to DIE");
LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
response.setLocalizerAction(LocalizerAction.DIE);
return response;
}
- return localizer.update(status.getResources());
+ return localizerRunner.update(status.getResources());
}
}
public void stop() {
- for (LocalizerRunner localizer : trackers.values()) {
+ for (LocalizerRunner localizer : localizerRunners.values()) {
localizer.interrupt();
}
}
@Override
public void handle(LocalizerEvent event) {
- synchronized (trackers) {
- String locId = event.getLocalizerId();
- LocalizerRunner localizer = trackers.get(locId);
+ synchronized (localizerRunners) {
+ String localizerId = event.getLocalizerId();
+ LocalizerRunner localizerRunner = localizerRunners.get(localizerId);
switch(event.getType()) {
case REQUEST_RESOURCE_LOCALIZATION:
// 0) find running localizer or start new thread
LocalizerResourceRequestEvent req =
(LocalizerResourceRequestEvent)event;
- if (null == localizer) {
- LOG.info("Created localizer for " + req.getLocalizerId());
- // TODO: ROUND_ROBIN below.
- localizer = new LocalizerRunner(req.getContext(),
- sysDirs.get(0), req.getLocalizerId(), logDirs.get(0));
- trackers.put(locId, localizer);
- localizer.start();
+ if (null == localizerRunner) {
+ LOG.info("Created localizerRunner for " + req.getLocalizerId());
+ localizerRunner =
+ new LocalizerRunner(req.getContext(), req.getLocalizerId());
+ localizerRunners.put(localizerId, localizerRunner);
+ localizerRunner.start();
}
// 1) propagate event
- localizer.addResource(req);
+ localizerRunner.addResource(req);
break;
case ABORT_LOCALIZATION:
+ // TODO: Who calls this?
// 0) find running localizer, interrupt and remove
- if (null == localizer) {
+ if (null == localizerRunner) {
return; // ignore; already gone
}
- trackers.remove(locId);
- localizer.interrupt();
+ localizerRunners.remove(localizerId);
+ localizerRunner.interrupt();
break;
}
}
}
}
+ /**
+ * Runs the {@link ContainerLocalizer} itself in a separate process with
+ * access to user's credentials. One {@link LocalizerRunner} per localizerId.
+ *
+ */
class LocalizerRunner extends Thread {
final LocalizerContext context;
final String localizerId;
- final Path nmPrivate;
- final Path rootLogDir;
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
final List<LocalizerResourceRequestEvent> pending;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- LocalizerRunner(LocalizerContext context, Path nmPrivate,
- String localizerId, Path logDir) {
- this(context, nmPrivate, localizerId, logDir,
- new ArrayList<LocalizerResourceRequestEvent>(),
- new HashMap<LocalResourceRequest,LocalizerResourceRequestEvent>());
- }
-
- LocalizerRunner(LocalizerContext context, Path nmPrivate,
- String localizerId, Path logDir,
- List<LocalizerResourceRequestEvent> pending,
- Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled) {
- this.nmPrivate = nmPrivate;
+ LocalizerRunner(LocalizerContext context, String localizerId) {
this.context = context;
this.localizerId = localizerId;
- this.rootLogDir = logDir;
- this.pending = pending;
- this.scheduled = scheduled;
+ this.pending = new ArrayList<LocalizerResourceRequestEvent>();
+ this.scheduled =
+ new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
}
public void addResource(LocalizerResourceRequestEvent request) {
+ // TDOO: Synchronization
pending.add(request);
}
- LocalResource findNextResource() {
+ /**
+ * Find next resource to be given to a spawned localizer.
+ *
+ * @return
+ */
+ private LocalResource findNextResource() {
+ // TODO: Synchronization
for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
i.hasNext();) {
LocalizerResourceRequestEvent evt = i.next();
@@ -457,16 +473,19 @@ public class ResourceLocalizationService
// TODO this sucks. Fix it later
LocalizerHeartbeatResponse update(
- List<LocalResourceStatus> stats) {
+ List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
- if (stats.isEmpty()) {
+ // The localizer has just spawned. Start giving it resources for
+ // remote-fetching.
+ if (remoteResourceStatuses.isEmpty()) {
LocalResource next = findNextResource();
if (next != null) {
response.setLocalizerAction(LocalizerAction.LIVE);
response.addResource(next);
} else if (pending.isEmpty()) {
+ // TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
} else {
response.setLocalizerAction(LocalizerAction.LIVE);
@@ -474,7 +493,7 @@ public class ResourceLocalizationService
return response;
}
- for (LocalResourceStatus stat : stats) {
+ for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null;
try {
@@ -498,6 +517,7 @@ public class ResourceLocalizationService
stat.getLocalSize()));
} catch (URISyntaxException e) { }
if (pending.isEmpty()) {
+ // TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
break;
}
@@ -534,20 +554,28 @@ public class ResourceLocalizationService
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
try {
+ // Use LocalDirAllocator to get nmPrivateDir
+ Path nmPrivateCTokensPath =
+ localDirsSelector.getLocalPathForWrite(
+ NM_PRIVATE_DIR
+ + Path.SEPARATOR
+ + String.format(ContainerLocalizer.TOKEN_FILE_FMT,
+ localizerId), getConfig());
// 0) init queue, etc.
// 1) write credentials to private dir
DataOutputStream tokenOut = null;
try {
Credentials credentials = context.getCredentials();
- Path cTokens = new Path(nmPrivate, String.format(
- ContainerLocalizer.TOKEN_FILE_FMT, localizerId));
FileContext lfs = getLocalFileContext(getConfig());
- tokenOut = lfs.create(cTokens, EnumSet.of(CREATE, OVERWRITE));
+ tokenOut =
+ lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
LOG.info("Writing credentials to the nmPrivate file "
- + cTokens.toString() + ". Credentials list: ");
- for (Token<? extends TokenIdentifier> tk :
- credentials.getAllTokens()) {
- LOG.info(tk.getService() + " : " + tk.encodeToUrlString());
+ + nmPrivateCTokensPath.toString() + ". Credentials list: ");
+ if (LOG.isDebugEnabled()) {
+ for (Token<? extends TokenIdentifier> tk : credentials
+ .getAllTokens()) {
+ LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
+ }
}
credentials.writeTokenStorageToStream(tokenOut);
} finally {
@@ -556,9 +584,10 @@ public class ResourceLocalizationService
}
}
// 2) exec initApplication and wait
- exec.startLocalizer(nmPrivate, locAddr, context.getUser(),
+ exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
+ context.getUser(),
ConverterUtils.toString(context.getContainerId().getAppId()),
- localizerId, rootLogDir, localDirs);
+ localizerId, localDirs);
} catch (Exception e) {
// 3) on error, report failure to Container and signal ABORT
// 3.1) notify resource of failed localization
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Tue May 10 15:18:58 2011
@@ -23,13 +23,20 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
-public class ContainerLocalizationRequestEvent extends ContainerLocalizationEvent {
+public class ContainerLocalizationRequestEvent extends
+ ContainerLocalizationEvent {
private final LocalResourceVisibility vis;
private final Collection<LocalResourceRequest> reqs;
- public ContainerLocalizationRequestEvent(Container c, Collection<LocalResourceRequest> reqs,
- LocalResourceVisibility vis) {
+ /**
+ * Event requesting the localization of the reqs all with visibility vis
+ * @param c
+ * @param reqs
+ * @param vis
+ */
+ public ContainerLocalizationRequestEvent(Container c,
+ Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) {
super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
this.vis = vis;
this.reqs = reqs;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java Tue May 10 15:18:58 2011
@@ -29,7 +29,7 @@ public class ResourceEvent extends Abstr
this.rsrc = rsrc;
}
- public LocalResourceRequest getLocalResource() {
+ public LocalResourceRequest getLocalResourceRequest() {
return rsrc;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java Tue May 10 15:18:58 2011
@@ -78,31 +78,41 @@ public class AggregatedLogFormat {
public static class LogValue {
- private final File containerLogDir;
+ private final String[] rootLogDirs;
+ private final ContainerId containerId;
- public LogValue(File containerLogDir) {
- this.containerLogDir = containerLogDir;
+ public LogValue(String[] rootLogDirs, ContainerId containerId) {
+ this.rootLogDirs = rootLogDirs;
+ this.containerId = containerId;
}
public void write(DataOutputStream out) throws IOException {
- if (!this.containerLogDir.isDirectory()) {
- return; // ContainerDir may have been deleted by the user.
- }
+ for (String rootLogDir : this.rootLogDirs) {
+ File appLogDir =
+ new File(rootLogDir, ConverterUtils.toString(this.containerId
+ .getAppId()));
+ File containerLogDir =
+ new File(appLogDir, ConverterUtils.toString(this.containerId));
+
+ if (!containerLogDir.isDirectory()) {
+ continue; // ContainerDir may have been deleted by the user.
+ }
- for (File logFile : this.containerLogDir.listFiles()) {
+ for (File logFile : containerLogDir.listFiles()) {
- // Write the logFile Type
- out.writeUTF(logFile.getName());
+ // Write the logFile Type
+ out.writeUTF(logFile.getName());
- // Write the log length as UTF so that it is printable
- out.writeUTF(String.valueOf(logFile.length()));
+ // Write the log length as UTF so that it is printable
+ out.writeUTF(String.valueOf(logFile.length()));
- // Write the log itself
- FileInputStream in = new FileInputStream(logFile);
- byte[] buf = new byte[65535];
- int len = 0;
- while ((len = in.read(buf)) != -1) {
- out.write(buf, 0, len);
+ // Write the log itself
+ FileInputStream in = new FileInputStream(logFile);
+ byte[] buf = new byte[65535];
+ int len = 0;
+ while ((len = in.read(buf)) != -1) {
+ out.write(buf, 0, len);
+ }
}
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Tue May 10 15:18:58 2011
@@ -20,8 +20,6 @@ package org.apache.hadoop.yarn.server.no
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,12 +43,12 @@ public class AppLogAggregatorImpl implem
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
- private final ApplicationId applicationId;
+ private final String applicationId;
private boolean logAggregationDisabled = false;
private final Configuration conf;
private final DeletionService delService;
private final UserGroupInformation userUgi;
- private final File localAppLogDir;
+ private final String[] rootLogDirs;
private final Path remoteNodeLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
@@ -62,22 +60,18 @@ public class AppLogAggregatorImpl implem
public AppLogAggregatorImpl(DeletionService deletionService,
Configuration conf, ApplicationId appId, UserGroupInformation userUgi,
- File localAppLogDir, Path remoteNodeLogFileForApp,
+ String[] localRootLogDirs, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy) {
this.conf = conf;
this.delService = deletionService;
- this.applicationId = appId;
+ this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
- this.localAppLogDir = localAppLogDir;
+ this.rootLogDirs = localRootLogDirs;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
}
- private File getLocalContainerLogDir(ContainerId containerId) {
- return new File(this.localAppLogDir, ConverterUtils.toString(containerId));
- }
-
private void uploadLogsForContainer(ContainerId containerId) {
if (this.logAggregationDisabled) {
@@ -99,11 +93,9 @@ public class AppLogAggregatorImpl implem
}
}
- File containerLogDir = getLocalContainerLogDir(containerId);
- LOG.info("Uploading logs for container " + containerId + " from "
- + containerLogDir);
+ LOG.info("Uploading logs for container " + containerId);
LogKey logKey = new LogKey(containerId);
- LogValue logValue = new LogValue(containerLogDir);
+ LogValue logValue = new LogValue(this.rootLogDirs, containerId);
try {
this.writer.append(logKey, logValue);
} catch (IOException e) {
@@ -135,9 +127,12 @@ public class AppLogAggregatorImpl implem
uploadLogsForContainer(containerId);
}
- // Remove the local app-log-dir
- this.delService.delete(this.userUgi.getShortUserName(), new Path(
- this.localAppLogDir.getAbsolutePath()), new Path[] {});
+ // Remove the local app-log-dirs
+ for (String rootLogDir : this.rootLogDirs) {
+ File localAppLogDir = new File(rootLogDir, this.applicationId);
+ this.delService.delete(this.userUgi.getShortUserName(), new Path(
+ localAppLogDir.getAbsolutePath()), new Path[] {});
+ }
if (this.writer != null) {
this.writer.closeWriter();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue May 10 15:18:58 2011
@@ -21,9 +21,6 @@ package org.apache.hadoop.yarn.server.no
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -33,10 +30,8 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -62,7 +57,7 @@ public class LogAggregationService exten
private final DeletionService deletionService;
- private File localRootLogDir;
+ private String[] localRootLogDirs;
Path remoteRootLogDir;
private String nodeFile;
@@ -86,8 +81,8 @@ public class LogAggregationService exten
}
public synchronized void init(Configuration conf) {
- this.localRootLogDir =
- new File(conf.get(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR));
+ this.localRootLogDirs =
+ conf.getStrings(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR);
this.remoteRootLogDir =
new Path(conf.get(NMConfig.REMOTE_USER_LOG_DIR,
NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
@@ -119,10 +114,6 @@ public class LogAggregationService exten
return new Path(remoteRootLogDir, ConverterUtils.toString(appId));
}
- File getLocalAppLogDir(ApplicationId appId) {
- return new File(this.localRootLogDir, ConverterUtils.toString(appId));
- }
-
@Override
public synchronized void stop() {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
@@ -148,7 +139,7 @@ public class LogAggregationService exten
// New application
AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.deletionService, getConfig(), appId,
- userUgi, getLocalAppLogDir(appId),
+ userUgi, this.localRootLogDirs,
getRemoteNodeLogFileForApp(appId), logRetentionPolicy);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Tue May 10 15:18:58 2011
@@ -24,14 +24,19 @@ import static org.apache.hadoop.yarn.ser
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -52,12 +57,14 @@ public class ContainerLogsPage extends N
NMWebParams {
private final Configuration conf;
+ private final LocalDirAllocator logsSelector;
private final Context nmContext;
private final RecordFactory recordFactory;
@Inject
public ContainersLogsBlock(Configuration conf, Context context) {
this.conf = conf;
+ this.logsSelector = new LocalDirAllocator(NMConfig.NM_LOG_DIR);
this.nmContext = context;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
}
@@ -81,10 +88,19 @@ public class ContainerLogsPage extends N
container.getContainerState())) {
if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
- // TODO: Get the following from logs' owning component.
- File containerLogsDir =
- getContainerLogDir(this.conf, containerId);
- File logFile = new File(containerLogsDir, $(CONTAINER_LOG_TYPE));
+ File logFile = null;
+ try {
+ logFile =
+ new File(this.logsSelector
+ .getLocalPathToRead(
+ ConverterUtils.toString(containerId.getAppId())
+ + Path.SEPARATOR + $(CONTAINER_ID)
+ + Path.SEPARATOR
+ + $(CONTAINER_LOG_TYPE), this.conf).toUri()
+ .getPath());
+ } catch (Exception e) {
+ div.h1("Cannot find this log on the local disk.")._();
+ }
div.h1(logFile.getName());
long start =
$("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
@@ -130,19 +146,20 @@ public class ContainerLogsPage extends N
div._();
} else {
// Just print out the log-types
- File containerLogsDir =
- getContainerLogDir(this.conf, containerId);
- // TODO: No nested dir structure. Fix MR userlogs.
- for (File logFile : containerLogsDir.listFiles()) {
- div
- .p()
- .a(
- url($(NM_HTTP_URL), "yarn", "containerlogs",
- $(CONTAINER_ID),
- logFile.getName(), "?start=-4076"),
- logFile.getName() + " : Total file length is "
- + logFile.length() + " bytes.")
- ._();
+ List<File> containerLogsDirs =
+ getContainerLogDirs(this.conf, containerId);
+ for (File containerLogsDir : containerLogsDirs) {
+ for (File logFile : containerLogsDir.listFiles()) {
+ div
+ .p()
+ .a(
+ url($(NM_HTTP_URL), "yarn", "containerlogs",
+ $(CONTAINER_ID),
+ logFile.getName(), "?start=-4076"),
+ logFile.getName() + " : Total file length is "
+ + logFile.length() + " bytes.")
+ ._();
+ }
}
div._();
}
@@ -151,16 +168,18 @@ public class ContainerLogsPage extends N
}
}
- static File
- getContainerLogDir(Configuration conf, ContainerId containerId) {
+ static List<File>
+ getContainerLogDirs(Configuration conf, ContainerId containerId) {
String[] logDirs =
conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
- File logDir = new File(logDirs[0]); // TODO: In case of ROUND_ROBIN
- String appIdStr = ConverterUtils.toString(containerId.getAppId());
- File appLogDir = new File(logDir, appIdStr);
- String containerIdStr = ConverterUtils.toString(containerId);
- File containerLogDir = new File(appLogDir, containerIdStr);
- return containerLogDir;
+ List<File> containerLogDirs = new ArrayList<File>(logDirs.length);
+ for (String logDir : logDirs) {
+ String appIdStr = ConverterUtils.toString(containerId.getAppId());
+ File appLogDir = new File(logDir, appIdStr);
+ String containerIdStr = ConverterUtils.toString(containerId);
+ containerLogDirs.add(new File(appLogDir, containerIdStr));
+ }
+ return containerLogDirs;
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Tue May 10 15:18:58 2011
@@ -96,9 +96,11 @@ public class TestDeletionService {
System.out.println("SEED: " + seed);
List<Path> dirs = buildDirs(r, base, 20);
createDirs(new Path("."), dirs);
- DeletionService del =
- new DeletionService(new FakeDefaultContainerExecutor());
- del.init(new Configuration());
+ FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
+ Configuration conf = new Configuration();
+ exec.setConf(conf);
+ DeletionService del = new DeletionService(exec);
+ del.init(conf);
del.start();
try {
for (Path p : dirs) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Tue May 10 15:18:58 2011
@@ -77,6 +77,7 @@ public class TestEventFlow {
conf.set(NMConfig.REMOTE_USER_LOG_DIR, remoteLogDir.getAbsolutePath());
ContainerExecutor exec = new DefaultContainerExecutor();
+ exec.setConf(conf);
DeletionService del = new DeletionService(exec);
Dispatcher dispatcher = new AsyncDispatcher();
NodeHealthCheckerService healthChecker = null;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Tue May 10 15:18:58 2011
@@ -90,7 +90,7 @@ public abstract class BaseContainerManag
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext();
- protected ContainerExecutor exec = new DefaultContainerExecutor();
+ protected ContainerExecutor exec;
protected DeletionService delSrvc;
protected String user = "nobody";
@@ -111,7 +111,9 @@ public abstract class BaseContainerManag
protected ContainerManagerImpl containerManager = null;
protected ContainerExecutor createContainerExecutor() {
- return new DefaultContainerExecutor();
+ DefaultContainerExecutor exec = new DefaultContainerExecutor();
+ exec.setConf(conf);
+ return exec;
}
@Before
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Tue May 10 15:18:58 2011
@@ -90,15 +90,13 @@ public class TestContainerLocalizer {
final String appId = "app_RM_0";
final String cId = "container_0";
final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
- final Path logDir = lfs.makeQualified(new Path(basedir, "logs"));
final List<Path> localDirs = new ArrayList<Path>();
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
}
RecordFactory mockRF = getMockLocalizerRecordFactory();
ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
- appId, cId, logDir, localDirs,
- new HashMap<LocalResource,Future<Path>>(), mockRF);
+ appId, cId, localDirs, mockRF);
ContainerLocalizer localizer = spy(concreteLoc);
// return credential stream instead of opening local file
@@ -166,29 +164,25 @@ public class TestContainerLocalizer {
// run localization
assertEquals(0, localizer.runLocalization(nmAddr));
- // verify created cache, application dirs
+ // verify created cache
for (Path p : localDirs) {
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
// $x/usercache/$user/filecache
- verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(true));
+ verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
Path appDir =
new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
// $x/usercache/$user/appcache/$appId/filecache
Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
- verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(true));
+ verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
// $x/usercache/$user/appcache/$appId/output
Path appOutput = new Path(appDir, ContainerLocalizer.OUTPUTDIR);
- verify(spylfs).mkdir(eq(appOutput), isA(FsPermission.class), eq(true));
+ verify(spylfs).mkdir(eq(appOutput), isA(FsPermission.class), eq(false));
}
// verify tokens read at expected location
verify(spylfs).open(tokenPath);
- // verify log dir creation
- verify(spylfs).mkdir(eq(new Path(logDir, appId)),
- isA(FsPermission.class), anyBoolean());
-
// verify downloaded resources reported to NM
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Tue May 10 15:18:58 2011
@@ -99,7 +99,8 @@ public class TestLogAggregationService e
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
// AppLogDir should be created
- File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+ File app1LogDir =
+ new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
@@ -141,7 +142,8 @@ public class TestLogAggregationService e
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
// AppLogDir should be created
- File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+ File app1LogDir =
+ new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
@@ -173,7 +175,8 @@ public class TestLogAggregationService e
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
// AppLogDir should be created
- File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+ File app1LogDir =
+ new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
@@ -189,7 +192,8 @@ public class TestLogAggregationService e
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
- File app2LogDir = logAggregationService.getLocalAppLogDir(application2);
+ File app2LogDir =
+ new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
logAggregationService.handle(new LogAggregatorAppStartedEvent(
application2, this.user, null,
@@ -209,7 +213,8 @@ public class TestLogAggregationService e
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
- File app3LogDir = logAggregationService.getLocalAppLogDir(application3);
+ File app3LogDir =
+ new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
logAggregationService.handle(new LogAggregatorAppStartedEvent(
application3, this.user, null,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue May 10 15:18:58 2011
@@ -122,8 +122,8 @@ public class TestNMWebServer {
throws IOException {
// ContainerLogDir should be created
File containerLogDir =
- ContainerLogsPage.ContainersLogsBlock.getContainerLogDir(conf,
- containerId);
+ ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(conf,
+ containerId).get(0);
containerLogDir.mkdirs();
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
Writer writer = new FileWriter(new File(containerLogDir, fileType));