You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/02/07 04:25:56 UTC
[12/23] hadoop git commit: YARN-6100. Improve YARN webservice to
output aggregated container logs. Contributed by Xuan Gong.
YARN-6100. Improve YARN webservice to output aggregated container logs. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/327c9980
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/327c9980
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/327c9980
Branch: refs/heads/YARN-5734
Commit: 327c9980aafce52cc02d2b8885fc4e9f628ab23c
Parents: 2a942ee
Author: Junping Du <ju...@apache.org>
Authored: Thu Feb 2 00:41:18 2017 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Thu Feb 2 00:41:18 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/yarn/client/cli/LogsCLI.java | 17 --
.../yarn/logaggregation/LogToolUtils.java | 158 ++++++++++++++
.../webapp/AHSWebServices.java | 210 ++++---------------
.../webapp/TestAHSWebServices.java | 29 ++-
.../nodemanager/webapp/NMWebServices.java | 93 +++++---
.../nodemanager/webapp/TestNMWebServices.java | 59 ++++--
6 files changed, 332 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
index 1de4cd1..3cb1c7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
-import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
@@ -509,17 +508,9 @@ public class LogsCLI extends Configured implements Tool {
newOptions.setLogTypes(matchedFiles);
Client webServiceClient = Client.create();
- String containerString = String.format(
- LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerIdStr, nodeId);
- out.println(containerString);
- out.println(StringUtils.repeat("=", containerString.length()));
boolean foundAnyLogs = false;
byte[] buffer = new byte[65536];
for (String logFile : newOptions.getLogTypes()) {
- out.println("LogType:" + logFile);
- out.println("Log Upload Time:"
- + Times.format(System.currentTimeMillis()));
- out.println("Log Contents:");
InputStream is = null;
try {
ClientResponse response = getResponeFromNMWebService(conf,
@@ -541,14 +532,6 @@ public class LogsCLI extends Configured implements Tool {
response.getEntity(String.class));
out.println(msg);
}
- StringBuilder sb = new StringBuilder();
- sb.append("End of LogType:" + logFile + ".");
- if (request.getContainerState() == ContainerState.RUNNING) {
- sb.append(" This log file belongs"
- + " to a running container (" + containerIdStr + ") and so may"
- + " not be complete.");
- }
- out.println(sb.toString());
out.flush();
foundAnyLogs = true;
} catch (ClientHandlerException | UniformInterfaceException ex) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
index e117736..d83a8ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
@@ -20,11 +20,17 @@ package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@@ -40,6 +46,9 @@ public final class LogToolUtils {
private LogToolUtils() {}
+ public static final String CONTAINER_ON_NODE_PATTERN =
+ "Container: %s on %s";
+
/**
* Return a list of {@link ContainerLogMeta} for a container
* from Remote FileSystem.
@@ -114,4 +123,153 @@ public final class LogToolUtils {
}
return containersLogMeta;
}
+
+ /**
+ * Output container log.
+ * @param containerId the containerId
+ * @param nodeId the nodeId
+ * @param fileName the log file name
+ * @param fileLength the log file length
+ * @param outputSize the output size
+ * @param lastModifiedTime the log file last modified time
+ * @param fis the log file input stream
+ * @param os the output stream
+ * @param buf the buffer
+ * @param logType the log type.
+ * @throws IOException if we can not access the log file.
+ */
+ public static void outputContainerLog(String containerId, String nodeId,
+ String fileName, long fileLength, long outputSize,
+ String lastModifiedTime, InputStream fis, OutputStream os,
+ byte[] buf, ContainerLogType logType) throws IOException {
+ long toSkip = 0;
+ long totalBytesToRead = fileLength;
+ long skipAfterRead = 0;
+ if (outputSize < 0) {
+ long absBytes = Math.abs(outputSize);
+ if (absBytes < fileLength) {
+ toSkip = fileLength - absBytes;
+ totalBytesToRead = absBytes;
+ }
+ org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
+ } else {
+ if (outputSize < fileLength) {
+ totalBytesToRead = outputSize;
+ skipAfterRead = fileLength - outputSize;
+ }
+ }
+
+ long curRead = 0;
+ long pendingRead = totalBytesToRead - curRead;
+ int toRead = pendingRead > buf.length ? buf.length
+ : (int) pendingRead;
+ int len = fis.read(buf, 0, toRead);
+ boolean keepGoing = (len != -1 && curRead < totalBytesToRead);
+ if (keepGoing) {
+ StringBuilder sb = new StringBuilder();
+ String containerStr = String.format(
+ LogToolUtils.CONTAINER_ON_NODE_PATTERN,
+ containerId, nodeId);
+ sb.append(containerStr + "\n");
+ sb.append("LogType: " + logType + "\n");
+ sb.append(StringUtils.repeat("=", containerStr.length()) + "\n");
+ sb.append("FileName:" + fileName + "\n");
+ sb.append("LogLastModifiedTime:" + lastModifiedTime + "\n");
+ sb.append("LogLength:" + Long.toString(fileLength) + "\n");
+ sb.append("LogContents:\n");
+ byte[] b = sb.toString().getBytes(
+ Charset.forName("UTF-8"));
+ os.write(b, 0, b.length);
+ }
+ while (keepGoing) {
+ os.write(buf, 0, len);
+ curRead += len;
+
+ pendingRead = totalBytesToRead - curRead;
+ toRead = pendingRead > buf.length ? buf.length
+ : (int) pendingRead;
+ len = fis.read(buf, 0, toRead);
+ keepGoing = (len != -1 && curRead < totalBytesToRead);
+ }
+ org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
+ os.flush();
+ }
+
+ public static boolean outputAggregatedContainerLog(Configuration conf,
+ ApplicationId appId, String appOwner,
+ String containerId, String nodeId,
+ String logFileName, long outputSize, OutputStream os,
+ byte[] buf) throws IOException {
+ boolean findLogs = false;
+ RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileDir(conf, appId, appOwner);
+ while (nodeFiles != null && nodeFiles.hasNext()) {
+ final FileStatus thisNodeFile = nodeFiles.next();
+ String nodeName = thisNodeFile.getPath().getName();
+ if (nodeName.equals(appId + ".har")) {
+ Path p = new Path("har:///"
+ + thisNodeFile.getPath().toUri().getRawPath());
+ nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+ continue;
+ }
+ if ((nodeId == null || nodeName.contains(LogAggregationUtils
+ .getNodeString(nodeId))) && !nodeName.endsWith(
+ LogAggregationUtils.TMP_FILE_SUFFIX)) {
+ AggregatedLogFormat.LogReader reader = null;
+ try {
+ reader = new AggregatedLogFormat.LogReader(conf,
+ thisNodeFile.getPath());
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+ while (valueStream != null && !key.toString()
+ .equals(containerId)) {
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ if (valueStream == null) {
+ continue;
+ }
+ while (true) {
+ try {
+ String fileType = valueStream.readUTF();
+ String fileLengthStr = valueStream.readUTF();
+ long fileLength = Long.parseLong(fileLengthStr);
+ if (fileType.equalsIgnoreCase(logFileName)) {
+ LogToolUtils.outputContainerLog(containerId,
+ nodeId, fileType, fileLength, outputSize,
+ Times.format(thisNodeFile.getModificationTime()),
+ valueStream, os, buf, ContainerLogType.AGGREGATED);
+ StringBuilder sb = new StringBuilder();
+ String endOfFile = "End of LogFile:" + fileType;
+ sb.append("\n" + endOfFile + "\n");
+ sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ + "\n\n");
+ byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
+ os.write(b, 0, b.length);
+ findLogs = true;
+ } else {
+ long totalSkipped = 0;
+ long currSkipped = 0;
+ while (currSkipped != -1 && totalSkipped < fileLength) {
+ currSkipped = valueStream.skip(
+ fileLength - totalSkipped);
+ totalSkipped += currSkipped;
+ }
+ }
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+ os.flush();
+ return findLogs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
index 9bac474..a10bfac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
-import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@@ -43,12 +41,10 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -56,13 +52,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@@ -71,11 +63,11 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
-import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -360,27 +352,27 @@ public class AHSWebServices extends WebServices {
} catch (Exception ex) {
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, null, null, containerIdStr,
- filename, format, length);
+ filename, format, length, false);
}
String appOwner = appInfo.getUser();
+ if (isFinishedState(appInfo.getAppState())) {
+ // directly find logs from HDFS.
+ return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
+ filename, format, length, false);
+ }
- ContainerInfo containerInfo;
- try {
- containerInfo = super.getContainer(
- req, res, appId.toString(),
- containerId.getApplicationAttemptId().toString(),
- containerId.toString());
- } catch (Exception ex) {
- if (isFinishedState(appInfo.getAppState())) {
- // directly find logs from HDFS.
+ if (isRunningState(appInfo.getAppState())) {
+ ContainerInfo containerInfo;
+ try {
+ containerInfo = super.getContainer(
+ req, res, appId.toString(),
+ containerId.getApplicationAttemptId().toString(),
+ containerId.toString());
+ } catch (Exception ex) {
+ // output the aggregated logs
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
- filename, format, length);
+ filename, format, length, true);
}
- return createBadResponse(Status.INTERNAL_SERVER_ERROR,
- "Can not get ContainerInfo for the container: " + containerId);
- }
- String nodeId = containerInfo.getNodeId();
- if (isRunningState(appInfo.getAppState())) {
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
String uri = "/" + containerId.toString() + "/logs/" + filename;
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
@@ -392,9 +384,6 @@ public class AHSWebServices extends WebServices {
HttpServletResponse.SC_TEMPORARY_REDIRECT);
response.header("Location", resURI);
return response.build();
- } else if (isFinishedState(appInfo.getAppState())) {
- return sendStreamOutputResponse(appId, appOwner, nodeId,
- containerIdStr, filename, format, length);
} else {
return createBadResponse(Status.NOT_FOUND,
"The application is not at Running or Finished State.");
@@ -419,7 +408,8 @@ public class AHSWebServices extends WebServices {
private Response sendStreamOutputResponse(ApplicationId appId,
String appOwner, String nodeId, String containerIdStr,
- String fileName, String format, long bytes) {
+ String fileName, String format, long bytes,
+ boolean printEmptyLocalContainerLog) {
String contentType = WebAppUtils.getDefaultLogContentType();
if (format != null && !format.isEmpty()) {
contentType = WebAppUtils.getSupportedLogContentType(format);
@@ -433,15 +423,11 @@ public class AHSWebServices extends WebServices {
StreamingOutput stream = null;
try {
stream = getStreamingOutput(appId, appOwner, nodeId,
- containerIdStr, fileName, bytes);
+ containerIdStr, fileName, bytes, printEmptyLocalContainerLog);
} catch (Exception ex) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
ex.getMessage());
}
- if (stream == null) {
- return createBadResponse(Status.INTERNAL_SERVER_ERROR,
- "Can not get log for container: " + containerIdStr);
- }
ResponseBuilder response = Response.ok(stream);
response.header("Content-Type", contentType);
// Sending the X-Content-Type-Options response header with the value
@@ -451,146 +437,30 @@ public class AHSWebServices extends WebServices {
return response.build();
}
- private StreamingOutput getStreamingOutput(ApplicationId appId,
- String appOwner, final String nodeId, final String containerIdStr,
- final String logFile, final long bytes) throws IOException{
- String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
- org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
- conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
- org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
- FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
- FileContext fc = FileContext.getFileContext(
- qualifiedRemoteRootLogDir.toUri(), conf);
- org.apache.hadoop.fs.Path remoteAppDir = null;
- if (appOwner == null) {
- org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
- .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
- FileStatus[] matching = fc.util().globStatus(toMatch);
- if (matching == null || matching.length != 1) {
- return null;
- }
- remoteAppDir = matching[0].getPath();
- } else {
- remoteAppDir = LogAggregationUtils
- .getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix);
- }
- final RemoteIterator<FileStatus> nodeFiles;
- nodeFiles = fc.listStatus(remoteAppDir);
- if (!nodeFiles.hasNext()) {
- return null;
- }
-
+ private StreamingOutput getStreamingOutput(final ApplicationId appId,
+ final String appOwner, final String nodeId, final String containerIdStr,
+ final String logFile, final long bytes,
+ final boolean printEmptyLocalContainerLog) throws IOException{
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException,
WebApplicationException {
byte[] buf = new byte[65535];
- boolean findLogs = false;
- while (nodeFiles.hasNext()) {
- final FileStatus thisNodeFile = nodeFiles.next();
- String nodeName = thisNodeFile.getPath().getName();
- if ((nodeId == null || nodeName.contains(LogAggregationUtils
- .getNodeString(nodeId))) && !nodeName.endsWith(
- LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader = null;
- try {
- reader = new AggregatedLogFormat.LogReader(conf,
- thisNodeFile.getPath());
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
- while (valueStream != null && !key.toString()
- .equals(containerIdStr)) {
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
- }
- if (valueStream == null) {
- continue;
- }
- while (true) {
- try {
- String fileType = valueStream.readUTF();
- String fileLengthStr = valueStream.readUTF();
- long fileLength = Long.parseLong(fileLengthStr);
- if (fileType.equalsIgnoreCase(logFile)) {
- StringBuilder sb = new StringBuilder();
- sb.append("LogType:");
- sb.append(fileType + "\n");
- sb.append("Log Upload Time:");
- sb.append(Times.format(System.currentTimeMillis()) + "\n");
- sb.append("LogLength:");
- sb.append(fileLengthStr + "\n");
- sb.append("Log Contents:\n");
- byte[] b = sb.toString().getBytes(
- Charset.forName("UTF-8"));
- os.write(b, 0, b.length);
-
- long toSkip = 0;
- long totalBytesToRead = fileLength;
- long skipAfterRead = 0;
- if (bytes < 0) {
- long absBytes = Math.abs(bytes);
- if (absBytes < fileLength) {
- toSkip = fileLength - absBytes;
- totalBytesToRead = absBytes;
- }
- org.apache.hadoop.io.IOUtils.skipFully(
- valueStream, toSkip);
- } else {
- if (bytes < fileLength) {
- totalBytesToRead = bytes;
- skipAfterRead = fileLength - bytes;
- }
- }
-
- long curRead = 0;
- long pendingRead = totalBytesToRead - curRead;
- int toRead = pendingRead > buf.length ? buf.length
- : (int) pendingRead;
- int len = valueStream.read(buf, 0, toRead);
- while (len != -1 && curRead < totalBytesToRead) {
- os.write(buf, 0, len);
- curRead += len;
-
- pendingRead = totalBytesToRead - curRead;
- toRead = pendingRead > buf.length ? buf.length
- : (int) pendingRead;
- len = valueStream.read(buf, 0, toRead);
- }
- org.apache.hadoop.io.IOUtils.skipFully(
- valueStream, skipAfterRead);
- sb = new StringBuilder();
- sb.append("\nEnd of LogType:" + fileType + "\n");
- b = sb.toString().getBytes(Charset.forName("UTF-8"));
- os.write(b, 0, b.length);
- findLogs = true;
- } else {
- long totalSkipped = 0;
- long currSkipped = 0;
- while (currSkipped != -1 && totalSkipped < fileLength) {
- currSkipped = valueStream.skip(
- fileLength - totalSkipped);
- totalSkipped += currSkipped;
- }
- }
- } catch (EOFException eof) {
- break;
- }
- }
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- }
- }
- os.flush();
+ boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
+ appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
if (!findLogs) {
throw new IOException("Can not find logs for container:"
+ containerIdStr);
+ } else {
+ if (printEmptyLocalContainerLog) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(containerIdStr + "\n");
+ sb.append("LogType: " + ContainerLogType.LOCAL + "\n");
+ sb.append("LogContents:\n");
+ sb.append(getNoRedirectWarning() + "\n");
+ os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
+ }
}
}
};
@@ -640,4 +510,12 @@ public class AHSWebServices extends WebServices {
throw new WebApplicationException(ex);
}
}
+
+ @Private
+ @VisibleForTesting
+ public static String getNoRedirectWarning() {
+ return "We do not have NodeManager web address, so we can not "
+ + "re-direct the request to related NodeManager "
+ + "for local container logs.";
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
index f553bb0..3d1c901 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
@@ -35,7 +35,7 @@ import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
-
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -584,7 +584,10 @@ public class TestAHSWebServices extends JerseyTestBase {
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1ForApp100));
int fullTextSize = responseText.getBytes().length;
- int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length;
+ String tailEndSeparator = StringUtils.repeat("*",
+ "End of LogFile:syslog".length() + 50) + "\n\n";
+ int tailTextSize = "\nEnd of LogFile:syslog\n".getBytes().length
+ + tailEndSeparator.getBytes().length;
String logMessage = "Hello." + containerId1ForApp100;
int fileContentSize = logMessage.getBytes().length;
@@ -685,6 +688,28 @@ public class TestAHSWebServices extends JerseyTestBase {
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
+
+ // If we can not container information from ATS, we would try to
+ // get aggregated log from remote FileSystem.
+ ContainerId containerId1000 = ContainerId.newContainerId(
+ appAttemptId, 1000);
+ String content = "Hello." + containerId1000;
+ NodeId nodeId = NodeId.newInstance("test host", 100);
+ TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
+ rootLogDir, containerId1000, nodeId, fileName, user, content, true);
+ r = resource();
+ ClientResponse response = r.path("ws").path("v1")
+ .path("applicationhistory").path("containerlogs")
+ .path(containerId1000.toString()).path(fileName)
+ .queryParam("user.name", user)
+ .accept(MediaType.TEXT_PLAIN)
+ .get(ClientResponse.class);
+ String responseText = response.getEntity(String.class);
+ assertTrue(responseText.contains(content));
+ // Also test whether we output the empty local container log, and give
+ // the warning message.
+ assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
+ assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
}
@Test(timeout = 10000)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
index 1357d5a..07acd4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@@ -41,6 +42,9 @@ import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.http.JettyUtils;
@@ -57,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
@@ -64,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
+import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp;
@@ -74,6 +80,7 @@ import com.google.inject.Singleton;
@Singleton
@Path("/ws/v1/node")
public class NMWebServices {
+ private static final Log LOG = LogFactory.getLog(NMWebServices.class);
private Context nmContext;
private ResourceView rview;
private WebApp webapp;
@@ -330,17 +337,32 @@ public class NMWebServices {
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
@Public
@Unstable
- public Response getLogs(@PathParam("containerid") String containerIdStr,
+ public Response getLogs(
+ @PathParam("containerid") final String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("format") String format,
@QueryParam("size") String size) {
- ContainerId containerId;
+ ContainerId tempContainerId;
try {
- containerId = ContainerId.fromString(containerIdStr);
+ tempContainerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException ex) {
return Response.status(Status.BAD_REQUEST).build();
}
-
+ final ContainerId containerId = tempContainerId;
+ boolean tempIsRunning = false;
+ // check what is the status for container
+ try {
+ Container container = nmContext.getContainers().get(containerId);
+ tempIsRunning = (container.getContainerState() == ContainerState.RUNNING);
+ } catch (Exception ex) {
+ // This NM does not have this container any more. We
+ // assume the container has already finished.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Can not find the container:" + containerId
+ + " in this node.");
+ }
+ }
+ final boolean isRunning = tempIsRunning;
File logFile = null;
try {
logFile = ContainerLogsUtils.getContainerLogFile(
@@ -351,6 +373,8 @@ public class NMWebServices {
return Response.serverError().entity(ex.getMessage()).build();
}
final long bytes = parseLongParam(size);
+ final String lastModifiedTime = Times.format(logFile.lastModified());
+ final String outputFileName = filename;
String contentType = WebAppUtils.getDefaultLogContentType();
if (format != null && !format.isEmpty()) {
contentType = WebAppUtils.getSupportedLogContentType(format);
@@ -374,39 +398,40 @@ public class NMWebServices {
try {
int bufferSize = 65536;
byte[] buf = new byte[bufferSize];
- long toSkip = 0;
- long totalBytesToRead = fileLength;
- long skipAfterRead = 0;
- if (bytes < 0) {
- long absBytes = Math.abs(bytes);
- if (absBytes < fileLength) {
- toSkip = fileLength - absBytes;
- totalBytesToRead = absBytes;
- }
- org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
+ LogToolUtils.outputContainerLog(containerId.toString(),
+ nmContext.getNodeId().toString(), outputFileName, fileLength,
+ bytes, lastModifiedTime, fis, os, buf, ContainerLogType.LOCAL);
+ StringBuilder sb = new StringBuilder();
+ String endOfFile = "End of LogFile:" + outputFileName;
+ sb.append(endOfFile + ".");
+ if (isRunning) {
+ sb.append("This log file belongs to a running container ("
+ + containerIdStr + ") and so may not be complete." + "\n");
} else {
- if (bytes < fileLength) {
- totalBytesToRead = bytes;
- skipAfterRead = fileLength - bytes;
- }
+ sb.append("\n");
}
-
- long curRead = 0;
- long pendingRead = totalBytesToRead - curRead;
- int toRead = pendingRead > buf.length ? buf.length
- : (int) pendingRead;
- int len = fis.read(buf, 0, toRead);
- while (len != -1 && curRead < totalBytesToRead) {
- os.write(buf, 0, len);
- curRead += len;
-
- pendingRead = totalBytesToRead - curRead;
- toRead = pendingRead > buf.length ? buf.length
- : (int) pendingRead;
- len = fis.read(buf, 0, toRead);
+ sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ + "\n\n");
+ os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
+ // If we have aggregated logs for this container,
+ // output the aggregation logs as well.
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ Application app = nmContext.getApplications().get(appId);
+ String appOwner = app == null ? null : app.getUser();
+ try {
+ LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
+ appId, appOwner, containerId.toString(),
+ nmContext.getNodeId().toString(), outputFileName, bytes,
+ os, buf);
+ } catch (Exception ex) {
+ // Something wrong when we try to access the aggregated log.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Can not access the aggregated log for "
+ + "the container:" + containerId);
+ LOG.debug(ex.getMessage());
+ }
}
- org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
- os.flush();
} finally {
IOUtils.closeQuietly(fis);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index a6d4153..7764ceb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -384,8 +384,9 @@ public class TestNMWebServices extends JerseyTestBase {
ClientResponse response = r.path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
String responseText = response.getEntity(String.class);
- assertEquals(logMessage, responseText);
- int fullTextSize = responseText.getBytes().length;
+ String responseLogMessage = getLogContext(responseText);
+ assertEquals(logMessage, responseLogMessage);
+ int fullTextSize = responseLogMessage.getBytes().length;
// specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from
@@ -394,9 +395,10 @@ public class TestNMWebServices extends JerseyTestBase {
.queryParam("size", "5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
- assertEquals(5, responseText.getBytes().length);
- assertEquals(new String(logMessage.getBytes(), 0, 5), responseText);
- assertTrue(fullTextSize >= responseText.getBytes().length);
+ responseLogMessage = getLogContext(responseText);
+ assertEquals(5, responseLogMessage.getBytes().length);
+ assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage);
+ assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
// specify the bytes which is larger than the actual file size,
// we would get the full logs
@@ -404,8 +406,9 @@ public class TestNMWebServices extends JerseyTestBase {
.queryParam("size", "10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
- assertEquals(fullTextSize, responseText.getBytes().length);
- assertEquals(logMessage, responseText);
+ responseLogMessage = getLogContext(responseText);
+ assertEquals(fullTextSize, responseLogMessage.getBytes().length);
+ assertEquals(logMessage, responseLogMessage);
// specify a negative number, it would get the last n bytes from
// container log
@@ -413,25 +416,28 @@ public class TestNMWebServices extends JerseyTestBase {
.queryParam("size", "-5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
- assertEquals(5, responseText.getBytes().length);
+ responseLogMessage = getLogContext(responseText);
+ assertEquals(5, responseLogMessage.getBytes().length);
assertEquals(new String(logMessage.getBytes(),
- logMessage.getBytes().length - 5, 5), responseText);
- assertTrue(fullTextSize >= responseText.getBytes().length);
+ logMessage.getBytes().length - 5, 5), responseLogMessage);
+ assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
response = r.path(filename)
.queryParam("size", "-10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
+ responseLogMessage = getLogContext(responseText);
assertEquals("text/plain; charset=utf-8", response.getType().toString());
- assertEquals(fullTextSize, responseText.getBytes().length);
- assertEquals(logMessage, responseText);
+ assertEquals(fullTextSize, responseLogMessage.getBytes().length);
+ assertEquals(logMessage, responseLogMessage);
// ask and download it
response = r.path(filename)
.queryParam("format", "octet-stream")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
- assertEquals(logMessage, responseText);
+ responseLogMessage = getLogContext(responseText);
+ assertEquals(logMessage, responseLogMessage);
assertEquals(200, response.getStatus());
assertEquals("application/octet-stream; charset=utf-8",
response.getType().toString());
@@ -475,10 +481,11 @@ public class TestNMWebServices extends JerseyTestBase {
TestNMWebServices.class.getSimpleName() + "temp-log-dir");
try {
String aggregatedLogFile = filename + "-aggregated";
+ String aggregatedLogMessage = "This is aggregated ;og.";
TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
- aggregatedLogFile, "user", logMessage, true);
+ aggregatedLogFile, "user", aggregatedLogMessage, true);
r1 = resource();
response = r1.path("ws").path("v1").path("node")
.path("containers").path(containerIdStr)
@@ -501,6 +508,21 @@ public class TestNMWebServices extends JerseyTestBase {
assertEquals(meta.get(0).getFileName(), filename);
}
}
+
+ // Test whether we could get aggregated log as well
+ TestContainerLogsUtils.createContainerLogFileInRemoteFS(
+ nmContext.getConf(), FileSystem.get(nmContext.getConf()),
+ tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
+ filename, "user", aggregatedLogMessage, true);
+ response = r.path(filename)
+ .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
+ responseText = response.getEntity(String.class);
+ assertTrue(responseText.contains("LogType: "
+ + ContainerLogType.AGGREGATED));
+ assertTrue(responseText.contains(aggregatedLogMessage));
+ assertTrue(responseText.contains("LogType: "
+ + ContainerLogType.LOCAL));
+ assertTrue(responseText.contains(logMessage));
} finally {
FileUtil.fullyDelete(tempLogDir);
}
@@ -511,7 +533,7 @@ public class TestNMWebServices extends JerseyTestBase {
r.path(filename).accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
- assertEquals(logMessage, responseText);
+ assertTrue(responseText.contains(logMessage));
}
public void verifyNodesXML(NodeList nodes) throws JSONException, Exception {
@@ -601,4 +623,11 @@ public class TestNMWebServices extends JerseyTestBase {
YarnVersionInfo.getVersion(), resourceManagerVersion);
}
+ private String getLogContext(String fullMessage) {
+ String prefix = "LogContents:\n";
+ String postfix = "End of LogFile:";
+ int prefixIndex = fullMessage.indexOf(prefix) + prefix.length();
+ int postfixIndex = fullMessage.indexOf(postfix);
+ return fullMessage.substring(prefixIndex, postfixIndex);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org