You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/28 08:45:07 UTC
svn commit: r1190174 [2/3] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/or...
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Fri Oct 28 06:45:04 2011
@@ -29,8 +29,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -48,7 +50,6 @@ import org.apache.hadoop.yarn.state.Mult
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* The state machine for the representation of an Application
@@ -63,6 +64,7 @@ public class ApplicationImpl implements
final ApplicationACLsManager aclsManager;
private final ReadLock readLock;
private final WriteLock writeLock;
+ private final Context context;
private static final Log LOG = LogFactory.getLog(Application.class);
@@ -71,12 +73,13 @@ public class ApplicationImpl implements
public ApplicationImpl(Dispatcher dispatcher,
ApplicationACLsManager aclsManager, String user, ApplicationId appId,
- Credentials credentials) {
+ Credentials credentials, Context context) {
this.dispatcher = dispatcher;
this.user = user.toString();
this.appId = appId;
this.credentials = credentials;
this.aclsManager = aclsManager;
+ this.context = context;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@@ -173,7 +176,13 @@ public class ApplicationImpl implements
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
-
+
+ // Transitions from FINISHED state
+ .addTransition(ApplicationState.FINISHED,
+ ApplicationState.FINISHED,
+ ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
+ new AppLogsAggregatedTransition())
+
// create the topology tables
.installTopology();
@@ -239,11 +248,15 @@ public class ApplicationImpl implements
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
+ Map<ApplicationAccessType, String> appAcls =
+ app.getContainers().values().iterator().next().getLaunchContext()
+ .getApplicationACLs();
+
// Inform the logAggregator
app.dispatcher.getEventHandler().handle(
- new LogAggregatorAppStartedEvent(app.appId, app.user,
- app.credentials,
- ContainerLogsRetentionPolicy.ALL_CONTAINERS)); // TODO: Fix
+ new LogAggregatorAppStartedEvent(app.appId, app.user,
+ app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
+ appAcls));
// Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) {
@@ -339,13 +352,20 @@ public class ApplicationImpl implements
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
- app.aclsManager.removeApplication(app.getAppId());
-
// Inform the logService
app.dispatcher.getEventHandler().handle(
new LogAggregatorAppFinishedEvent(app.appId));
- // TODO: Also make logService write the acls to the aggregated file.
+ }
+ }
+
+ static class AppLogsAggregatedTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+ ApplicationId appId = event.getApplicationID();
+ app.context.getApplications().remove(appId);
+ app.aclsManager.removeApplication(appId);
}
}
@@ -377,6 +397,6 @@ public class ApplicationImpl implements
@Override
public String toString() {
- return ConverterUtils.toString(appId);
+ return appId.toString();
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Fri Oct 28 06:45:04 2011
@@ -119,11 +119,11 @@ public class ContainerLaunch implements
// /////////////////////////// Variable expansion
// Before the container script gets written out.
List<String> newCmds = new ArrayList<String>(command.size());
- String appIdStr = app.toString();
+ String appIdStr = app.getAppId().toString();
Path containerLogDir =
- this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
- + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf,
- false);
+ this.logDirsSelector.getLocalPathForWrite(ContainerLaunch
+ .getRelativeContainerLogDir(appIdStr, containerIdStr),
+ LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@@ -384,6 +384,11 @@ public class ContainerLaunch implements
return processId;
}
+ public static String getRelativeContainerLogDir(String appIdStr,
+ String containerIdStr) {
+ return appIdStr + Path.SEPARATOR + containerIdStr;
+ }
+
private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+ Path.SEPARATOR;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java Fri Oct 28 06:45:04 2011
@@ -25,10 +25,16 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
+import java.io.InputStreamReader;
import java.io.IOException;
+import java.io.Writer;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -41,6 +47,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -48,32 +56,50 @@ import org.apache.hadoop.yarn.util.Conve
public class AggregatedLogFormat {
static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
-
+ private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
+ private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
+ private static final LogKey VERSION_KEY = new LogKey("VERSION");
+ private static final Map<String, LogKey> RESERVED_KEYS;
+ //Maybe write out the retention policy.
+ //Maybe write out a list of containerLogs skipped by the retention policy.
+ private static final int VERSION = 1;
+
+ static {
+ RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
+ RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
+ RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
+ RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
+ }
+
public static class LogKey implements Writable {
- private String containerId;
+ private String keyString;
public LogKey() {
}
public LogKey(ContainerId containerId) {
- this.containerId = ConverterUtils.toString(containerId);
+ this.keyString = containerId.toString();
}
+ public LogKey(String keyString) {
+ this.keyString = keyString;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
- out.writeUTF(this.containerId);
+ out.writeUTF(this.keyString);
}
@Override
public void readFields(DataInput in) throws IOException {
- this.containerId = in.readUTF();
+ this.keyString = in.readUTF();
}
@Override
public String toString() {
- return this.containerId;
+ return this.keyString;
}
}
@@ -81,6 +107,8 @@ public class AggregatedLogFormat {
private final String[] rootLogDirs;
private final ContainerId containerId;
+ // TODO Maybe add a version string here. Instead of changing the version of
+ // the entire k-v format
public LogValue(String[] rootLogDirs, ContainerId containerId) {
this.rootLogDirs = rootLogDirs;
@@ -141,7 +169,8 @@ public class AggregatedLogFormat {
public FSDataOutputStream run() throws Exception {
return FileContext.getFileContext(conf).create(
remoteAppLogFile,
- EnumSet.of(CreateFlag.CREATE), new Options.CreateOpts[] {});
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
@@ -154,6 +183,40 @@ public class AggregatedLogFormat {
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
+ //Write the version string
+ writeVersion();
+ }
+
+ private void writeVersion() throws IOException {
+ DataOutputStream out = this.writer.prepareAppendKey(-1);
+ VERSION_KEY.write(out);
+ out.close();
+ out = this.writer.prepareAppendValue(-1);
+ out.writeInt(VERSION);
+ out.close();
+ this.fsDataOStream.hflush();
+ }
+
+ public void writeApplicationOwner(String user) throws IOException {
+ DataOutputStream out = this.writer.prepareAppendKey(-1);
+ APPLICATION_OWNER_KEY.write(out);
+ out.close();
+ out = this.writer.prepareAppendValue(-1);
+ out.writeUTF(user);
+ out.close();
+ }
+
+ public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
+ throws IOException {
+ DataOutputStream out = this.writer.prepareAppendKey(-1);
+ APPLICATION_ACL_KEY.write(out);
+ out.close();
+ out = this.writer.prepareAppendValue(-1);
+ for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
+ out.writeUTF(entry.getKey().toString());
+ out.writeUTF(entry.getValue());
+ }
+ out.close();
}
public void append(LogKey logKey, LogValue logValue) throws IOException {
@@ -184,12 +247,13 @@ public class AggregatedLogFormat {
private final FSDataInputStream fsDataIStream;
private final TFile.Reader.Scanner scanner;
+ private final TFile.Reader reader;
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext = FileContext.getFileContext(conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
- TFile.Reader reader =
+ reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
@@ -198,6 +262,69 @@ public class AggregatedLogFormat {
private boolean atBeginning = true;
/**
+ * Returns the owner of the application.
+ *
+ * @return the application owner.
+ * @throws IOException
+ */
+ public String getApplicationOwner() throws IOException {
+ TFile.Reader.Scanner ownerScanner = reader.createScanner();
+ LogKey key = new LogKey();
+ while (!ownerScanner.atEnd()) {
+ TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
+ key.readFields(entry.getKeyStream());
+ if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
+ DataInputStream valueStream = entry.getValueStream();
+ return valueStream.readUTF();
+ }
+ ownerScanner.advance();
+ }
+ return null;
+ }
+
+ /**
+ * Returns ACLs for the application. An empty map is returned if no ACLs are
+ * found.
+ *
+ * @return a map of the Application ACLs.
+ * @throws IOException
+ */
+ public Map<ApplicationAccessType, String> getApplicationAcls()
+ throws IOException {
+ // TODO Seek directly to the key once a comparator is specified.
+ TFile.Reader.Scanner aclScanner = reader.createScanner();
+ LogKey key = new LogKey();
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>();
+ while (!aclScanner.atEnd()) {
+ TFile.Reader.Scanner.Entry entry = aclScanner.entry();
+ key.readFields(entry.getKeyStream());
+ if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
+ DataInputStream valueStream = entry.getValueStream();
+ while (true) {
+ String appAccessOp = null;
+ String aclString = null;
+ try {
+ appAccessOp = valueStream.readUTF();
+ } catch (EOFException e) {
+ // Valid end of stream.
+ break;
+ }
+ try {
+ aclString = valueStream.readUTF();
+ } catch (EOFException e) {
+ throw new YarnException("Error reading ACLs", e);
+ }
+ acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
+ }
+
+ }
+ aclScanner.advance();
+ }
+ return acls;
+ }
+
+ /**
* Read the next key and return the value-stream.
*
* @param key
@@ -215,10 +342,99 @@ public class AggregatedLogFormat {
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
+ // Skip META keys
+ if (RESERVED_KEYS.containsKey(key.toString())) {
+ return next(key);
+ }
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
+
+ //TODO Change Log format and interfaces to be containerId specific.
+ // Avoid returning completeValueStreams.
+// public List<String> getTypesForContainer(DataInputStream valueStream){}
+//
+// /**
+// * @param valueStream
+// * The Log stream for the container.
+// * @param fileType
+// * the log type required.
+// * @return An InputStreamReader for the required log type or null if the
+// * type is not found.
+// * @throws IOException
+// */
+// public InputStreamReader getLogStreamForType(DataInputStream valueStream,
+// String fileType) throws IOException {
+// valueStream.reset();
+// try {
+// while (true) {
+// String ft = valueStream.readUTF();
+// String fileLengthStr = valueStream.readUTF();
+// long fileLength = Long.parseLong(fileLengthStr);
+// if (ft.equals(fileType)) {
+// BoundedInputStream bis =
+// new BoundedInputStream(valueStream, fileLength);
+// return new InputStreamReader(bis);
+// } else {
+// long totalSkipped = 0;
+// long currSkipped = 0;
+// while (currSkipped != -1 && totalSkipped < fileLength) {
+// currSkipped = valueStream.skip(fileLength - totalSkipped);
+// totalSkipped += currSkipped;
+// }
+// // TODO Verify skip behaviour.
+// if (currSkipped == -1) {
+// return null;
+// }
+// }
+// }
+// } catch (EOFException e) {
+// return null;
+// }
+// }
+
+ /**
+ * Writes all logs for a single container to the provided writer.
+ * @param valueStream
+ * @param writer
+ * @throws IOException
+ */
+ public static void readAcontainerLogs(DataInputStream valueStream,
+ Writer writer) throws IOException {
+ int bufferSize = 65536;
+ char[] cbuf = new char[bufferSize];
+ String fileType;
+ String fileLengthStr;
+ long fileLength;
+
+ while (true) {
+ try {
+ fileType = valueStream.readUTF();
+ } catch (EOFException e) {
+ // EndOfFile
+ return;
+ }
+ fileLengthStr = valueStream.readUTF();
+ fileLength = Long.parseLong(fileLengthStr);
+ writer.write("\n\nLogType:");
+ writer.write(fileType);
+ writer.write("\nLogLength:");
+ writer.write(fileLengthStr);
+ writer.write("\nLog Contents:\n");
+ // ByteLevel
+ BoundedInputStream bis =
+ new BoundedInputStream(valueStream, fileLength);
+ InputStreamReader reader = new InputStreamReader(bis);
+ int currentRead = 0;
+ int totalRead = 0;
+ while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
+ writer.write(cbuf);
+ totalRead += currentRead;
+ }
+ }
+ }
+
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* all types for a single container.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Fri Oct 28 06:45:04 2011
@@ -18,8 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
-import java.io.File;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,11 +28,16 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogWriter;
@@ -42,7 +48,10 @@ public class AppLogAggregatorImpl implem
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
+ private static final String TMP_FILE_SUFFIX = ".tmp";
+ private final Dispatcher dispatcher;
+ private final ApplicationId appId;
private final String applicationId;
private boolean logAggregationDisabled = false;
private final Configuration conf;
@@ -50,26 +59,34 @@ public class AppLogAggregatorImpl implem
private final UserGroupInformation userUgi;
private final String[] rootLogDirs;
private final Path remoteNodeLogFileForApp;
+ private final Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
private final BlockingQueue<ContainerId> pendingContainers;
private final AtomicBoolean appFinishing = new AtomicBoolean();
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+ private final Map<ApplicationAccessType, String> appAcls;
private LogWriter writer = null;
- public AppLogAggregatorImpl(DeletionService deletionService,
- Configuration conf, ApplicationId appId, UserGroupInformation userUgi,
- String[] localRootLogDirs, Path remoteNodeLogFileForApp,
- ContainerLogsRetentionPolicy retentionPolicy) {
+ public AppLogAggregatorImpl(Dispatcher dispatcher,
+ DeletionService deletionService, Configuration conf, ApplicationId appId,
+ UserGroupInformation userUgi, String[] localRootLogDirs,
+ Path remoteNodeLogFileForApp,
+ ContainerLogsRetentionPolicy retentionPolicy,
+ Map<ApplicationAccessType, String> appAcls) {
+ this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
+ this.appId = appId;
this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
this.rootLogDirs = localRootLogDirs;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+ this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
+ this.appAcls = appAcls;
}
private void uploadLogsForContainer(ContainerId containerId) {
@@ -80,11 +97,15 @@ public class AppLogAggregatorImpl implem
// Lazy creation of the writer
if (this.writer == null) {
- LOG.info("Starting aggregate log-file for app " + this.applicationId);
+ LOG.info("Starting aggregate log-file for app " + this.applicationId
+ + " at " + this.remoteNodeTmpLogFileForApp);
try {
this.writer =
- new LogWriter(this.conf, this.remoteNodeLogFileForApp,
+ new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
this.userUgi);
+ //Write ACLs once when and if the writer is created.
+ this.writer.writeApplicationACLs(appAcls);
+ this.writer.writeApplicationOwner(this.userUgi.getShortUserName());
} catch (IOException e) {
LOG.error("Cannot create writer for app " + this.applicationId
+ ". Disabling log-aggregation for this app.", e);
@@ -105,8 +126,8 @@ public class AppLogAggregatorImpl implem
}
@Override
- public void run() {
-
+ @SuppressWarnings("unchecked")
+ public void run() {
ContainerId containerId;
while (!this.appFinishing.get()) {
@@ -141,10 +162,33 @@ public class AppLogAggregatorImpl implem
this.writer.closeWriter();
LOG.info("Finished aggregate log-file for app " + this.applicationId);
}
-
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ FileSystem remoteFS = FileSystem.get(conf);
+ remoteFS.rename(remoteNodeTmpLogFileForApp, remoteNodeLogFileForApp);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to move temporary log file to final location: ["
+ + remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp
+ + "]", e);
+ }
+
+ this.dispatcher.getEventHandler().handle(
+ new ApplicationEvent(this.appId,
+ ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED));
+
this.appAggregationFinished.set(true);
}
+ private Path getRemoteNodeTmpLogFileForApp() {
+ return new Path(remoteNodeLogFileForApp.getParent(),
+ (remoteNodeLogFileForApp.getName() + TMP_FILE_SUFFIX));
+ }
+
private boolean shouldUploadLogs(ContainerId containerId,
boolean wasContainerSuccessful) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Fri Oct 28 06:45:04 2011
@@ -18,9 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -31,20 +31,26 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -54,20 +60,43 @@ public class LogAggregationService exten
private static final Log LOG = LogFactory
.getLog(LogAggregationService.class);
+ /*
+ * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
+ * Group to which NMOwner belongs> App dirs will be created as 750,
+ * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
+ * access / modify the files.
+ * <NMGroup> should obviously be a limited access group.
+ */
+ /**
+ * Permissions for the top level directory under which app directories will be
+ * created.
+ */
+ private static final FsPermission TLDIR_PERMISSIONS = FsPermission
+ .createImmutable((short) 01777);
+ /**
+ * Permissions for the Application directory.
+ */
+ private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
+ .createImmutable((short) 0750);
+
private final Context context;
private final DeletionService deletionService;
+ private final Dispatcher dispatcher;
private String[] localRootLogDirs;
Path remoteRootLogDir;
- private String nodeFile;
+ String remoteRootLogDirSuffix;
+ private NodeId nodeId;
+ private boolean isLogAggregationEnabled = false;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private final ExecutorService threadPool;
- public LogAggregationService(Context context,
+ public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService) {
super(LogAggregationService.class.getName());
+ this.dispatcher = dispatcher;
this.context = context;
this.deletionService = deletionService;
this.appLogAggregators =
@@ -80,10 +109,17 @@ public class LogAggregationService exten
public synchronized void init(Configuration conf) {
this.localRootLogDirs =
- conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
+ conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
+ YarnConfiguration.DEFAULT_NM_LOG_DIRS);
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ this.remoteRootLogDirSuffix =
+ conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ this.isLogAggregationEnabled =
+ conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+
super.init(conf);
}
@@ -91,37 +127,231 @@ public class LogAggregationService exten
public synchronized void start() {
// NodeId is only available during start, the following cannot be moved
// anywhere else.
- this.nodeFile = this.context.getNodeId().toString();
+ this.nodeId = this.context.getNodeId();
+ verifyAndCreateRemoteLogDir(getConfig());
super.start();
}
+
+ @Override
+ public synchronized void stop() {
+ LOG.info(this.getName() + " waiting for pending aggregation during exit");
+ for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
+ appLogAggregator.join();
+ }
+ super.stop();
+ }
- Path getRemoteNodeLogFileForApp(ApplicationId appId) {
- return getRemoteNodeLogFileForApp(this.remoteRootLogDir, appId,
- this.nodeFile);
+ /**
+ * Constructs the full filename for an application's log file per node.
+ * @param remoteRootLogDir
+ * @param appId
+ * @param user
+ * @param nodeId
+ * @param suffix
+ * @return the remote log file.
+ */
+ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
+ ApplicationId appId, String user, NodeId nodeId, String suffix) {
+ return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
+ getNodeString(nodeId));
+ }
+
+ /**
+ * Gets the remote app log dir.
+ * @param remoteRootLogDir
+ * @param appId
+ * @param user
+ * @param suffix
+ * @return the remote application specific log dir.
+ */
+ public static Path getRemoteAppLogDir(Path remoteRootLogDir,
+ ApplicationId appId, String user, String suffix) {
+ return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix),
+ appId.toString());
+ }
+
+ /**
+ * Gets the remote suffixed log dir for the user.
+ * @param remoteRootLogDir
+ * @param user
+ * @param suffix
+ * @return the remote suffixed log dir.
+ */
+ private static Path getRemoteLogSuffixedDir(Path remoteRootLogDir,
+ String user, String suffix) {
+ if (suffix == null || suffix.isEmpty()) {
+ return getRemoteLogUserDir(remoteRootLogDir, user);
+ }
+ // TODO Maybe support suffix to be more than a single file.
+ return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix);
}
- static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
- ApplicationId appId, String nodeFile) {
- return new Path(getRemoteAppLogDir(remoteRootLogDir, appId),
- nodeFile);
+ // TODO Add a utility method to list available log files. Ignore the
+ // temporary ones.
+
+ /**
+ * Gets the remote log user dir.
+ * @param remoteRootLogDir
+ * @param user
+ * @return the remote per user log dir.
+ */
+ private static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) {
+ return new Path(remoteRootLogDir, user);
+ }
+
+ /**
+ * Returns the suffix component of the log dir.
+ * @param conf
+ * @return the suffix which will be appended to the user log dir.
+ */
+ public static String getRemoteNodeLogDirSuffix(Configuration conf) {
+ return conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ }
+
+
+ /**
+ * Converts a nodeId to a form used in the app log file name.
+ * @param nodeId
+ * @return the node string to be used to construct the file name.
+ */
+ private static String getNodeString(NodeId nodeId) {
+ return nodeId.toString().replace(":", "_");
+ }
+
+
+
+
+
+ private void verifyAndCreateRemoteLogDir(Configuration conf) {
+ // Checking the existance of the TLD
+ FileSystem remoteFS = null;
+ try {
+ remoteFS = FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new YarnException("Unable to get Remote FileSystem isntance", e);
+ }
+ boolean remoteExists = false;
+ try {
+ remoteExists = remoteFS.exists(this.remoteRootLogDir);
+ } catch (IOException e) {
+ throw new YarnException("Failed to check for existance of remoteLogDir ["
+ + this.remoteRootLogDir + "]");
+ }
+ if (remoteExists) {
+ try {
+ FsPermission perms =
+ remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
+ if (!perms.equals(TLDIR_PERMISSIONS)) {
+ LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ + "] already exist, but with incorrect permissions. "
+ + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ + "]." + " The cluster may have problems with multiple users.");
+ }
+ } catch (IOException e) {
+ throw new YarnException(
+ "Failed while attempting to check permissions for dir ["
+ + this.remoteRootLogDir + "]");
+ }
+ } else {
+ LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ + "] does not exist. Attempting to create it.");
+ try {
+ Path qualified =
+ this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
+ remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
+ } catch (IOException e) {
+ throw new YarnException("Failed to create remoteLogDir ["
+ + this.remoteRootLogDir + "]", e);
+ }
+ }
+
}
- static Path getRemoteAppLogDir(Path remoteRootLogDir,
- ApplicationId appId) {
- return new Path(remoteRootLogDir, ConverterUtils.toString(appId));
+ Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
+ return LogAggregationService.getRemoteNodeLogFileForApp(
+ this.remoteRootLogDir, appId, user, this.nodeId,
+ this.remoteRootLogDirSuffix);
}
- @Override
- public synchronized void stop() {
- LOG.info(this.getName() + " waiting for pending aggregation during exit");
- for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
- appLogAggregator.join();
+ private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
+ throws IOException {
+ fs.mkdirs(path, new FsPermission(fsPerm));
+ fs.setPermission(path, new FsPermission(fsPerm));
+ }
+
+ private void createAppDir(final String user, final ApplicationId appId,
+ UserGroupInformation userUgi) {
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // TODO: Reuse FS for user?
+ FileSystem remoteFS = null;
+ Path userDir = null;
+ Path suffixDir = null;
+ Path appDir = null;
+ try {
+ remoteFS = FileSystem.get(getConfig());
+ } catch (IOException e) {
+ LOG.error("Failed to get remote FileSystem while processing app "
+ + appId, e);
+ throw e;
+ }
+ try {
+ userDir =
+ getRemoteLogUserDir(
+ LogAggregationService.this.remoteRootLogDir, user);
+ userDir =
+ userDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ } catch (IOException e) {
+ LOG.error("Failed to create user dir [" + userDir
+ + "] while processing app " + appId);
+ throw e;
+ }
+ try {
+ suffixDir =
+ getRemoteLogSuffixedDir(
+ LogAggregationService.this.remoteRootLogDir, user,
+ LogAggregationService.this.remoteRootLogDirSuffix);
+ suffixDir =
+ suffixDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+ } catch (IOException e) {
+ LOG.error("Failed to create suffixed user dir [" + suffixDir
+ + "] while processing app " + appId);
+ throw e;
+ }
+ try {
+ appDir =
+ getRemoteAppLogDir(LogAggregationService.this.remoteRootLogDir,
+ appId, user,
+ LogAggregationService.this.remoteRootLogDirSuffix);
+ appDir =
+ appDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+ } catch (IOException e) {
+ LOG.error("Failed to create application log dir [" + appDir
+ + "] while processing app " + appId);
+ throw e;
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new YarnException(e);
}
- super.stop();
}
private void initApp(final ApplicationId appId, String user,
- Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy) {
+ Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
+ Map<ApplicationAccessType, String> appAcls) {
// Get user's FileSystem credentials
UserGroupInformation userUgi =
@@ -133,41 +363,27 @@ public class LogAggregationService exten
}
}
+ // Create the app dir
+ createAppDir(user, appId, userUgi);
+
// New application
AppLogAggregator appLogAggregator =
- new AppLogAggregatorImpl(this.deletionService, getConfig(), appId,
- userUgi, this.localRootLogDirs,
- getRemoteNodeLogFileForApp(appId), logRetentionPolicy);
+ new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId,
+ userUgi, this.localRootLogDirs,
+ getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
}
- // Create the app dir
- try {
- userUgi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- // TODO: Reuse FS for user?
- FileSystem remoteFS = FileSystem.get(getConfig());
- remoteFS.mkdirs(getRemoteAppLogDir(
- LogAggregationService.this.remoteRootLogDir, appId)
- .makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory()));
- return null;
- }
- });
- } catch (Exception e) {
- throw new YarnException(e);
- }
- // Get the user configuration for the list of containers that need log
+ // TODO Get the user configuration for the list of containers that need log
// aggregation.
// Schedule the aggregator.
this.threadPool.execute(appLogAggregator);
}
- private void stopContainer(ContainerId containerId, String exitCode) {
+ private void stopContainer(ContainerId containerId, int exitCode) {
// A container is complete. Put this containers' logs up for aggregation if
// this containers' logs are needed.
@@ -179,7 +395,7 @@ public class LogAggregationService exten
}
this.appLogAggregators.get(
containerId.getApplicationAttemptId().getApplicationId())
- .startContainerLogAggregation(containerId, exitCode.equals("0"));
+ .startContainerLogAggregation(containerId, exitCode == 0);
}
private void stopApp(ApplicationId appId) {
@@ -196,27 +412,30 @@ public class LogAggregationService exten
@Override
public void handle(LogAggregatorEvent event) {
-// switch (event.getType()) {
-// case APPLICATION_STARTED:
-// LogAggregatorAppStartedEvent appStartEvent =
-// (LogAggregatorAppStartedEvent) event;
-// initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
-// appStartEvent.getCredentials(),
-// appStartEvent.getLogRetentionPolicy());
-// break;
-// case CONTAINER_FINISHED:
-// LogAggregatorContainerFinishedEvent containerFinishEvent =
-// (LogAggregatorContainerFinishedEvent) event;
-// stopContainer(containerFinishEvent.getContainerId(),
-// containerFinishEvent.getExitCode());
-// break;
-// case APPLICATION_FINISHED:
-// LogAggregatorAppFinishedEvent appFinishedEvent =
-// (LogAggregatorAppFinishedEvent) event;
-// stopApp(appFinishedEvent.getApplicationId());
-// break;
-// default:
-// ; // Ignore
-// }
+ if (this.isLogAggregationEnabled) {
+ switch (event.getType()) {
+ case APPLICATION_STARTED:
+ LogAggregatorAppStartedEvent appStartEvent =
+ (LogAggregatorAppStartedEvent) event;
+ initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
+ appStartEvent.getCredentials(),
+ appStartEvent.getLogRetentionPolicy(),
+ appStartEvent.getApplicationAcls());
+ break;
+ case CONTAINER_FINISHED:
+ LogAggregatorContainerFinishedEvent containerFinishEvent =
+ (LogAggregatorContainerFinishedEvent) event;
+ stopContainer(containerFinishEvent.getContainerId(),
+ containerFinishEvent.getExitCode());
+ break;
+ case APPLICATION_FINISHED:
+ LogAggregatorAppFinishedEvent appFinishedEvent =
+ (LogAggregatorAppFinishedEvent) event;
+ stopApp(appFinishedEvent.getApplicationId());
+ break;
+ default:
+ ; // Ignore
+ }
+ }
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java Fri Oct 28 06:45:04 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -110,7 +111,12 @@ public class LogDumper extends Configure
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
LogAggregationService.getRemoteNodeLogFileForApp(
- remoteRootLogDir, appId, nodeAddress));
+ remoteRootLogDir,
+ appId,
+ UserGroupInformation.getCurrentUser().getShortUserName(),
+ ConverterUtils.toNodeId(nodeAddress),
+ getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)));
return dumpAContainerLogs(containerIdStr, reader, out);
}
@@ -152,16 +158,21 @@ public class LogDumper extends Configure
Path remoteRootLogDir =
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ String logDirSuffix =
+ getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ //TODO Change this to get a list of files from the LAS.
Path remoteAppLogDir =
- LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId);
+ LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId, user,
+ logDirSuffix);
RemoteIterator<FileStatus> nodeFiles =
FileContext.getFileContext().listStatus(remoteAppLogDir);
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
- LogAggregationService.getRemoteNodeLogFileForApp(
- remoteRootLogDir, appId, thisNodeFile.getPath().getName()));
+ new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
try {
DataInputStream valueStream;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java Fri Oct 28 06:45:04 2011
@@ -1,25 +1,28 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+import java.util.Map;
+
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
@@ -28,14 +31,17 @@ public class LogAggregatorAppStartedEven
private final ContainerLogsRetentionPolicy retentionPolicy;
private final String user;
private final Credentials credentials;
+ private final Map<ApplicationAccessType, String> appAcls;
public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
- Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy) {
+ Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
+ Map<ApplicationAccessType, String> appAcls) {
super(LogAggregatorEventType.APPLICATION_STARTED);
this.applicationId = appId;
this.user = user;
this.credentials = credentials;
this.retentionPolicy = retentionPolicy;
+ this.appAcls = appAcls;
}
public ApplicationId getApplicationId() {
@@ -54,4 +60,8 @@ public class LogAggregatorAppStartedEven
return this.user;
}
+ public Map<ApplicationAccessType, String> getApplicationAcls() {
+ return this.appAcls;
+ }
+
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java?rev=1190174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java Fri Oct 28 06:45:04 2011
@@ -0,0 +1,175 @@
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.NM_NODENAME;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.APP_OWNER;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class AggregatedLogsBlock extends HtmlBlock {
+
+ private final Configuration conf;
+
+ @Inject
+ AggregatedLogsBlock(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ protected void render(Block html) {
+ ContainerId containerId = verifyAndGetContainerId(html);
+ NodeId nodeId = verifyAndGetNodeId(html);
+ String appOwner = verifyAndGetAppOwner(html);
+ if (containerId == null || nodeId == null || appOwner == null
+ || appOwner.isEmpty()) {
+ return;
+ }
+
+ ApplicationId applicationId =
+ containerId.getApplicationAttemptId().getApplicationId();
+ String logEntity = $(ENTITY_STRING);
+ if (logEntity == null || logEntity.isEmpty()) {
+ logEntity = containerId.toString();
+ }
+
+ Path remoteRootLogDir =
+ new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ AggregatedLogFormat.LogReader reader = null;
+ try {
+ reader =
+ new AggregatedLogFormat.LogReader(conf,
+ LogAggregationService.getRemoteNodeLogFileForApp(
+ remoteRootLogDir, applicationId, appOwner, nodeId,
+ LogAggregationService.getRemoteNodeLogDirSuffix(conf)));
+ } catch (FileNotFoundException e) {
+ // ACLs not available till the log file is opened.
+ html.h1()
+ ._("Logs not available for "
+ + logEntity
+ + ". Aggregation may not be complete, "
+ + "Check back later or try the nodemanager on "
+ + nodeId)._();
+ return;
+ } catch (IOException e) {
+ html.h1()._("Error getting logs for " + logEntity)._();
+ LOG.error("Error getting logs for " + logEntity, e);
+ return;
+ }
+
+ String owner = null;
+ Map<ApplicationAccessType, String> appAcls = null;
+ try {
+ owner = reader.getApplicationOwner();
+ appAcls = reader.getApplicationAcls();
+ } catch (IOException e) {
+ html.h1()._("Error getting logs for " + logEntity)._();
+ LOG.error("Error getting logs for " + logEntity, e);
+ return;
+ }
+ ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
+ aclsManager.addApplication(applicationId, appAcls);
+
+ String remoteUser = request().getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ if (callerUGI != null
+ && !aclsManager.checkAccess(callerUGI, ApplicationAccessType.VIEW_APP,
+ owner, applicationId)) {
+ html.h1()
+ ._("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity)._();
+ return;
+ }
+
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ try {
+ valueStream = reader.next(key);
+ while (valueStream != null
+ && !key.toString().equals(containerId.toString())) {
+ valueStream = reader.next(key);
+ }
+ if (valueStream == null) {
+ html.h1()._(
+ "Logs not available for " + logEntity
+ + ". Could be caused by the rentention policy")._();
+ return;
+ }
+ writer().write("<pre>");
+ AggregatedLogFormat.LogReader.readAcontainerLogs(valueStream, writer());
+ writer().write("</pre>");
+ return;
+ } catch (IOException e) {
+ html.h1()._("Error getting logs for " + logEntity)._();
+ LOG.error("Error getting logs for " + logEntity, e);
+ return;
+ }
+ }
+
+ private ContainerId verifyAndGetContainerId(Block html) {
+ String containerIdStr = $(CONTAINER_ID);
+ if (containerIdStr == null || containerIdStr.isEmpty()) {
+ html.h1()._("Cannot get container logs without a ContainerId")._();
+ return null;
+ }
+ ContainerId containerId = null;
+ try {
+ containerId = ConverterUtils.toContainerId(containerIdStr);
+ } catch (IllegalArgumentException e) {
+ html.h1()
+ ._("Cannot get container logs for invalid containerId: "
+ + containerIdStr)._();
+ return null;
+ }
+ return containerId;
+ }
+
+ private NodeId verifyAndGetNodeId(Block html) {
+ String nodeIdStr = $(NM_NODENAME);
+ if (nodeIdStr == null || nodeIdStr.isEmpty()) {
+ html.h1()._("Cannot get container logs without a NodeId")._();
+ return null;
+ }
+ NodeId nodeId = null;
+ try {
+ nodeId = ConverterUtils.toNodeId(nodeIdStr);
+ } catch (IllegalArgumentException e) {
+ html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
+ ._();
+ return null;
+ }
+ return nodeId;
+ }
+
+ private String verifyAndGetAppOwner(Block html) {
+ String appOwner = $(APP_OWNER);
+ if (appOwner == null || appOwner.isEmpty()) {
+ html.h1()._("Cannot get container logs without an app owner")._();
+ }
+ return appOwner;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java?rev=1190174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java Fri Oct 28 06:45:04 2011
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+public class AggregatedLogsNavBlock extends HtmlBlock implements NMWebParams {
+
+ @Override
+ protected void render(Block html) {
+ html
+ .div("#nav")
+ .h3()._("Logs")._() //
+ ._()
+ .div("#themeswitcher")._();
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java?rev=1190174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java Fri Oct 28 06:45:04 2011
@@ -0,0 +1,44 @@
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.THEMESWITCHER_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+
+
+import org.apache.hadoop.yarn.webapp.SubView;
+
+
+public class AggregatedLogsPage extends NMView {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.yarn.server.nodemanager.webapp.NMView#preHead(org.apache.hadoop.yarn.webapp.hamlet.Hamlet.HTML)
+ */
+ @Override
+ protected void preHead(Page.HTML<_> html) {
+ String logEntity = $(ENTITY_STRING);
+ if (logEntity == null || logEntity.isEmpty()) {
+ logEntity = $(CONTAINER_ID);
+ }
+ if (logEntity == null || logEntity.isEmpty()) {
+ logEntity = "UNKNOWN";
+ }
+ set(TITLE, join("Logs for ", logEntity));
+ set(ACCORDION_ID, "nav");
+ set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
+ set(THEMESWITCHER_ID, "themeswitcher");
+ }
+
+ @Override
+ protected Class<? extends SubView> content() {
+ return AggregatedLogsBlock.class;
+ }
+
+ @Override
+ protected Class<? extends SubView> nav() {
+ return AggregatedLogsNavBlock.class;
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java Fri Oct 28 06:45:04 2011
@@ -91,7 +91,8 @@ public class AllContainersPage extends N
._()
.td()._(container.getContainerState())._()
.td()
- .a(url("containerlogs", containerIdStr), "logs")._()
+ .a(url("containerlogs", containerIdStr, container.getUser()),
+ "logs")._()
._();
}
tableBody._()._()._();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Fri Oct 28 06:45:04 2011
@@ -18,9 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.THEMESWITCHER_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -30,32 +38,53 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.SubView;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
public class ContainerLogsPage extends NMView {
+
+ public static final String REDIRECT_URL = "redirect.url";
+
+ @Override protected void preHead(Page.HTML<_> html) {
+ String redirectUrl = $(REDIRECT_URL);
+ if (redirectUrl == null || redirectUrl.isEmpty()) {
+ set(TITLE, join("Logs for ", $(CONTAINER_ID)));
+ html.meta_http("refresh", "10");
+ } else {
+ if (redirectUrl.equals("false")) {
+ set(TITLE, join("Failed redirect for ", $(CONTAINER_ID)));
+ //Error getting redirect url. Fall through.
+ } else {
+ set(TITLE, join("Redirecting to log server for ", $(CONTAINER_ID)));
+ html.meta_http("refresh", "1; url=" + redirectUrl);
+ }
+ }
+
+ set(ACCORDION_ID, "nav");
+ set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
+ set(THEMESWITCHER_ID, "themeswitcher");
+ }
+
@Override
protected Class<? extends SubView> content() {
return ContainersLogsBlock.class;
}
public static class ContainersLogsBlock extends HtmlBlock implements
- NMWebParams {
-
+ NMWebParams {
private final Configuration conf;
private final LocalDirAllocator logsSelector;
private final Context nmContext;
@@ -72,13 +101,19 @@ public class ContainerLogsPage extends N
@Override
protected void render(Block html) {
- DIV<Hamlet> div = html.div("#content");
+ String redirectUrl = $(REDIRECT_URL);
+ if (redirectUrl !=null && redirectUrl.equals("false")) {
+ html.h1("Failed while trying to construct the redirect url to the log" +
+ " server. Log Server url may not be configured");
+ //Intentional fallthrough.
+ }
+
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IllegalArgumentException e) {
- div.h1("Invalid containerId " + $(CONTAINER_ID))._();
+ html.h1("Invalid containerId " + $(CONTAINER_ID));
return;
}
@@ -88,19 +123,28 @@ public class ContainerLogsPage extends N
applicationId);
Container container = this.nmContext.getContainers().get(containerId);
- if (application == null || container == null) {
- div.h1(
- "Unknown container. Container is either not yet running or "
+ if (application == null) {
+ html.h1(
+ "Unknown container. Container either has not started or "
+ "has already completed or "
- + "doesn't belong to this node at all.")._();
+ + "doesn't belong to this node at all.");
+ return;
+ }
+ if (container == null) {
+ // Container may have alerady completed, but logs not aggregated yet.
+ printLogs(html, containerId, applicationId, application);
return;
}
if (EnumSet.of(ContainerState.NEW, ContainerState.LOCALIZING,
- ContainerState.LOCALIZING).contains(container.getContainerState())) {
- div.h1("Container is not yet running. Current state is "
- + container.getContainerState())
- ._();
+ ContainerState.LOCALIZED).contains(container.getContainerState())) {
+ html.h1("Container is not yet running. Current state is "
+ + container.getContainerState());
+ return;
+ }
+
+ if (container.getContainerState() == ContainerState.LOCALIZATION_FAILED) {
+ html.h1("Container wasn't started. Localization failed.");
return;
}
@@ -108,103 +152,144 @@ public class ContainerLogsPage extends N
ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_SUCCESS).contains(
container.getContainerState())) {
+ printLogs(html, containerId, applicationId, application);
+ return;
+ }
+ if (EnumSet.of(ContainerState.KILLING,
+ ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerState.CONTAINER_RESOURCES_CLEANINGUP).contains(
+ container.getContainerState())) {
+ //Container may have generated some logs before being killed.
+ printLogs(html, containerId, applicationId, application);
+ return;
+ }
+ if (container.getContainerState().equals(ContainerState.DONE)) {
+ // Prev state unknown. Logs may be available.
+ printLogs(html, containerId, applicationId, application);
+ return;
+ } else {
+ html.h1("Container is no longer running...");
+ return;
+ }
+ }
- // Check for the authorization.
- String remoteUser = request().getRemoteUser();
- UserGroupInformation callerUGI = null;
- if (remoteUser != null) {
- callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
- }
- if (callerUGI != null && !this.aclsManager.checkAccess(callerUGI,
- ApplicationAccessType.VIEW_APP, application.getUser(),
- applicationId)) {
- div.h1(
- "You (User " + remoteUser
- + ") are not authorized to view the logs for application "
- + applicationId)._();
+ private void printLogs(Block html, ContainerId containerId,
+ ApplicationId applicationId, Application application) {
+ // Check for the authorization.
+ String remoteUser = request().getRemoteUser();
+ UserGroupInformation callerUGI = null;
+
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ if (callerUGI != null
+ && !this.aclsManager.checkAccess(callerUGI,
+ ApplicationAccessType.VIEW_APP, application.getUser(),
+ applicationId)) {
+ html.h1(
+ "User [" + remoteUser
+ + "] is not authorized to view the logs for application "
+ + applicationId);
+ return;
+ }
+
+ if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
+ File logFile = null;
+ try {
+ logFile =
+ new File(this.logsSelector
+ .getLocalPathToRead(
+ ContainerLaunch.getRelativeContainerLogDir(
+ applicationId.toString(), containerId.toString())
+ + Path.SEPARATOR + $(CONTAINER_LOG_TYPE), this.conf)
+ .toUri().getPath());
+ } catch (Exception e) {
+ html.h1("Cannot find this log on the local disk.");
return;
}
-
- if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
- File logFile = null;
- try {
- logFile =
- new File(this.logsSelector
- .getLocalPathToRead(
- ConverterUtils.toString(
- applicationId)
- + Path.SEPARATOR + $(CONTAINER_ID)
- + Path.SEPARATOR
- + $(CONTAINER_LOG_TYPE), this.conf).toUri()
- .getPath());
- } catch (Exception e) {
- div.h1("Cannot find this log on the local disk.")._();
- }
- div.h1(logFile == null ? "Unknown LogFile" : logFile.getName());
- long start =
- $("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
- start = start < 0 ? logFile.length() + start : start;
- start = start < 0 ? 0 : start;
- long end =
- $("end").isEmpty() ? logFile.length() : Long
- .parseLong($("end"));
- end = end < 0 ? logFile.length() + end : end;
- end = end < 0 ? logFile.length() : end;
- if (start > end) {
- writer().write("Invalid start and end values!");
- } else {
+ long start =
+ $("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
+ start = start < 0 ? logFile.length() + start : start;
+ start = start < 0 ? 0 : start;
+ long end =
+ $("end").isEmpty() ? logFile.length() : Long.parseLong($("end"));
+ end = end < 0 ? logFile.length() + end : end;
+ end = end < 0 ? logFile.length() : end;
+ if (start > end) {
+ html.h1("Invalid start and end values. Start: [" + start + "]"
+ + ", end[" + end + "]");
+ return;
+ } else {
+ InputStreamReader reader = null;
try {
long toRead = end - start;
if (toRead < logFile.length()) {
- div._("Showing " + toRead + " bytes. Click ")
- .a(url("containerlogs", $(CONTAINER_ID),
- logFile.getName()), "here")
- ._(" for full log").br()._();
+ html.p()._("Showing " + toRead + " bytes. Click ")
+ .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
+ logFile.getName(), "?start=0"), "here").
+ _(" for full log")._();
}
// TODO: Use secure IO Utils to avoid symlink attacks.
- //TODO Fix findBugs close warning along with IOUtils change
- FileReader reader = new FileReader(logFile);
- char[] cbuf = new char[65536];
- reader.skip(start);
+ // TODO Fix findBugs close warning along with IOUtils change
+ reader = new FileReader(logFile);
+ int bufferSize = 65536;
+ char[] cbuf = new char[bufferSize];
+
+ long skipped = 0;
+ long totalSkipped = 0;
+ while (totalSkipped < start) {
+ skipped = reader.skip(start - totalSkipped);
+ totalSkipped += skipped;
+ }
+
int len = 0;
- int totalRead = 0;
+ int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
writer().write("<pre>");
- while ((len = reader.read(cbuf, 0, (int) toRead)) > 0
- && totalRead < (end - start)) {
+
+ while ((len = reader.read(cbuf, 0, currentToRead)) > 0
+ && toRead > 0) {
writer().write(cbuf, 0, len); // TODO: HTMl Quoting?
- totalRead += len;
- toRead = toRead - totalRead;
+ toRead = toRead - len;
+ currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
+
reader.close();
writer().write("</pre>");
+
} catch (IOException e) {
- writer().write(
- "Exception reading log-file "
- + StringUtils.stringifyException(e));
- }
- }
- div._();
- } else {
- // Just print out the log-types
- List<File> containerLogsDirs =
- getContainerLogDirs(this.conf, containerId);
- for (File containerLogsDir : containerLogsDirs) {
- for (File logFile : containerLogsDir.listFiles()) {
- div
- .p()
- .a(
- url("containerlogs", $(CONTAINER_ID),
- logFile.getName(), "?start=-4076"),
- logFile.getName() + " : Total file length is "
- + logFile.length() + " bytes.")
- ._();
+ html.h1("Exception reading log-file. Log file was likely aggregated. "
+ + StringUtils.stringifyException(e));
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ // Ignore
+ }
}
}
- div._();
}
} else {
- div.h1("Container is no longer running..")._();
+ // Just print out the log-types
+ List<File> containerLogsDirs =
+ getContainerLogDirs(this.conf, containerId);
+ boolean foundLogFile = false;
+ for (File containerLogsDir : containerLogsDirs) {
+ for (File logFile : containerLogsDir.listFiles()) {
+ foundLogFile = true;
+ html.p()
+ .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
+ logFile.getName(), "?start=-4096"),
+ logFile.getName() + " : Total file length is "
+ + logFile.length() + " bytes.")._();
+ }
+ }
+ if (!foundLogFile) {
+ html.h1("No logs available for container " + containerId.toString());
+ return;
+ }
}
+ return;
}
static List<File>
@@ -222,6 +307,5 @@ public class ContainerLogsPage extends N
}
return containerLogDirs;
}
-
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java Fri Oct 28 06:45:04 2011
@@ -89,7 +89,8 @@ public class ContainerPage extends NMVie
._("User", container.getUser())
._("TotalMemoryNeeded",
container.getLaunchContext().getResource().getMemory())
- ._("logs", ujoin("containerlogs", $(CONTAINER_ID)), "Link to logs");
+ ._("logs", ujoin("containerlogs", $(CONTAINER_ID), container.getUser()),
+ "Link to logs");
html._(InfoBlock.class);
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java Fri Oct 28 06:45:04 2011
@@ -21,15 +21,27 @@ package org.apache.hadoop.yarn.server.no
import static org.apache.hadoop.yarn.util.StringHelper.join;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.Controller;
import com.google.inject.Inject;
public class NMController extends Controller implements NMWebParams {
+ private Context nmContext;
+ private Configuration nmConf;
+
@Inject
- public NMController(Configuration nmConf, RequestContext requestContext) {
+ public NMController(Configuration nmConf, RequestContext requestContext,
+ Context nmContext) {
super(requestContext);
+ this.nmContext = nmContext;
+ this.nmConf = nmConf;
}
@Override
@@ -63,6 +75,29 @@ public class NMController extends Contro
}
public void logs() {
+ String containerIdStr = $(CONTAINER_ID);
+ ContainerId containerId = null;
+ try {
+ containerId = ConverterUtils.toContainerId(containerIdStr);
+ } catch (IllegalArgumentException e) {
+ render(ContainerLogsPage.class);
+ return;
+ }
+ ApplicationId appId =
+ containerId.getApplicationAttemptId().getApplicationId();
+ Application app = nmContext.getApplications().get(appId);
+ if (app == null) {
+ String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
+ String redirectUrl = null;
+ if (logServerUrl == null || logServerUrl.isEmpty()) {
+ redirectUrl = "false";
+ } else {
+ redirectUrl =
+ url(logServerUrl, nmContext.getNodeId().toString(), containerIdStr,
+ containerIdStr, $(APP_OWNER));
+ }
+ set(ContainerLogsPage.REDIRECT_URL, redirectUrl);
+ }
render(ContainerLogsPage.class);
}
}