You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/02/02 08:41:21 UTC

hadoop git commit: YARN-6100. Improve YARN webservice to output aggregated container logs. Contributed by Xuan Gong.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2a942eed2 -> 327c9980a


YARN-6100. Improve YARN webservice to output aggregated container logs. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/327c9980
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/327c9980
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/327c9980

Branch: refs/heads/trunk
Commit: 327c9980aafce52cc02d2b8885fc4e9f628ab23c
Parents: 2a942ee
Author: Junping Du <ju...@apache.org>
Authored: Thu Feb 2 00:41:18 2017 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Thu Feb 2 00:41:18 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/yarn/client/cli/LogsCLI.java  |  17 --
 .../yarn/logaggregation/LogToolUtils.java       | 158 ++++++++++++++
 .../webapp/AHSWebServices.java                  | 210 ++++---------------
 .../webapp/TestAHSWebServices.java              |  29 ++-
 .../nodemanager/webapp/NMWebServices.java       |  93 +++++---
 .../nodemanager/webapp/TestNMWebServices.java   |  59 ++++--
 6 files changed, 332 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
index 1de4cd1..3cb1c7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
 import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
-import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
@@ -509,17 +508,9 @@ public class LogsCLI extends Configured implements Tool {
       newOptions.setLogTypes(matchedFiles);
 
       Client webServiceClient = Client.create();
-      String containerString = String.format(
-          LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerIdStr, nodeId);
-      out.println(containerString);
-      out.println(StringUtils.repeat("=", containerString.length()));
       boolean foundAnyLogs = false;
       byte[] buffer = new byte[65536];
       for (String logFile : newOptions.getLogTypes()) {
-        out.println("LogType:" + logFile);
-        out.println("Log Upload Time:"
-            + Times.format(System.currentTimeMillis()));
-        out.println("Log Contents:");
         InputStream is = null;
         try {
           ClientResponse response = getResponeFromNMWebService(conf,
@@ -541,14 +532,6 @@ public class LogsCLI extends Configured implements Tool {
                 response.getEntity(String.class));
             out.println(msg);
           }
-          StringBuilder sb = new StringBuilder();
-          sb.append("End of LogType:" + logFile + ".");
-          if (request.getContainerState() == ContainerState.RUNNING) {
-            sb.append(" This log file belongs"
-                + " to a running container (" + containerIdStr + ") and so may"
-                + " not be complete.");
-          }
-          out.println(sb.toString());
           out.flush();
           foundAnyLogs = true;
         } catch (ClientHandlerException | UniformInterfaceException ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
index e117736..d83a8ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
@@ -20,11 +20,17 @@ package org.apache.hadoop.yarn.logaggregation;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@@ -40,6 +46,9 @@ public final class LogToolUtils {
 
   private LogToolUtils() {}
 
+  public static final String CONTAINER_ON_NODE_PATTERN =
+      "Container: %s on %s";
+
   /**
    * Return a list of {@link ContainerLogMeta} for a container
    * from Remote FileSystem.
@@ -114,4 +123,153 @@ public final class LogToolUtils {
     }
     return containersLogMeta;
   }
+
+  /**
+   * Output container log.
+   * @param containerId the containerId
+   * @param nodeId the nodeId
+   * @param fileName the log file name
+   * @param fileLength the log file length
+   * @param outputSize the output size
+   * @param lastModifiedTime the log file last modified time
+   * @param fis the log file input stream
+   * @param os the output stream
+   * @param buf the buffer
+   * @param logType the log type.
+   * @throws IOException if we can not access the log file.
+   */
+  public static void outputContainerLog(String containerId, String nodeId,
+      String fileName, long fileLength, long outputSize,
+      String lastModifiedTime, InputStream fis, OutputStream os,
+      byte[] buf, ContainerLogType logType) throws IOException {
+    long toSkip = 0;
+    long totalBytesToRead = fileLength;
+    long skipAfterRead = 0;
+    if (outputSize < 0) {
+      long absBytes = Math.abs(outputSize);
+      if (absBytes < fileLength) {
+        toSkip = fileLength - absBytes;
+        totalBytesToRead = absBytes;
+      }
+      org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
+    } else {
+      if (outputSize < fileLength) {
+        totalBytesToRead = outputSize;
+        skipAfterRead = fileLength - outputSize;
+      }
+    }
+
+    long curRead = 0;
+    long pendingRead = totalBytesToRead - curRead;
+    int toRead = pendingRead > buf.length ? buf.length
+        : (int) pendingRead;
+    int len = fis.read(buf, 0, toRead);
+    boolean keepGoing = (len != -1 && curRead < totalBytesToRead);
+    if (keepGoing) {
+      StringBuilder sb = new StringBuilder();
+      String containerStr = String.format(
+          LogToolUtils.CONTAINER_ON_NODE_PATTERN,
+          containerId, nodeId);
+      sb.append(containerStr + "\n");
+      sb.append("LogType: " + logType + "\n");
+      sb.append(StringUtils.repeat("=", containerStr.length()) + "\n");
+      sb.append("FileName:" + fileName + "\n");
+      sb.append("LogLastModifiedTime:" + lastModifiedTime + "\n");
+      sb.append("LogLength:" + Long.toString(fileLength) + "\n");
+      sb.append("LogContents:\n");
+      byte[] b = sb.toString().getBytes(
+          Charset.forName("UTF-8"));
+      os.write(b, 0, b.length);
+    }
+    while (keepGoing) {
+      os.write(buf, 0, len);
+      curRead += len;
+
+      pendingRead = totalBytesToRead - curRead;
+      toRead = pendingRead > buf.length ? buf.length
+          : (int) pendingRead;
+      len = fis.read(buf, 0, toRead);
+      keepGoing = (len != -1 && curRead < totalBytesToRead);
+    }
+    org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
+    os.flush();
+  }
+
+  public static boolean outputAggregatedContainerLog(Configuration conf,
+      ApplicationId appId, String appOwner,
+      String containerId, String nodeId,
+      String logFileName, long outputSize, OutputStream os,
+      byte[] buf) throws IOException {
+    boolean findLogs = false;
+    RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+        .getRemoteNodeFileDir(conf, appId, appOwner);
+    while (nodeFiles != null && nodeFiles.hasNext()) {
+      final FileStatus thisNodeFile = nodeFiles.next();
+      String nodeName = thisNodeFile.getPath().getName();
+      if (nodeName.equals(appId + ".har")) {
+        Path p = new Path("har:///"
+            + thisNodeFile.getPath().toUri().getRawPath());
+        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+        continue;
+      }
+      if ((nodeId == null || nodeName.contains(LogAggregationUtils
+          .getNodeString(nodeId))) && !nodeName.endsWith(
+              LogAggregationUtils.TMP_FILE_SUFFIX)) {
+        AggregatedLogFormat.LogReader reader = null;
+        try {
+          reader = new AggregatedLogFormat.LogReader(conf,
+              thisNodeFile.getPath());
+          DataInputStream valueStream;
+          LogKey key = new LogKey();
+          valueStream = reader.next(key);
+          while (valueStream != null && !key.toString()
+              .equals(containerId)) {
+            // Next container
+            key = new LogKey();
+            valueStream = reader.next(key);
+          }
+          if (valueStream == null) {
+            continue;
+          }
+          while (true) {
+            try {
+              String fileType = valueStream.readUTF();
+              String fileLengthStr = valueStream.readUTF();
+              long fileLength = Long.parseLong(fileLengthStr);
+              if (fileType.equalsIgnoreCase(logFileName)) {
+                LogToolUtils.outputContainerLog(containerId,
+                    nodeId, fileType, fileLength, outputSize,
+                    Times.format(thisNodeFile.getModificationTime()),
+                    valueStream, os, buf, ContainerLogType.AGGREGATED);
+                StringBuilder sb = new StringBuilder();
+                String endOfFile = "End of LogFile:" + fileType;
+                sb.append("\n" + endOfFile + "\n");
+                sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+                    + "\n\n");
+                byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
+                os.write(b, 0, b.length);
+                findLogs = true;
+              } else {
+                long totalSkipped = 0;
+                long currSkipped = 0;
+                while (currSkipped != -1 && totalSkipped < fileLength) {
+                  currSkipped = valueStream.skip(
+                      fileLength - totalSkipped);
+                  totalSkipped += currSkipped;
+                }
+              }
+            } catch (EOFException eof) {
+              break;
+            }
+          }
+        } finally {
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      }
+    }
+    os.flush();
+    return findLogs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
index 9bac474..a10bfac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
@@ -43,12 +41,10 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.http.JettyUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -56,13 +52,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@@ -71,11 +63,11 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
-import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -360,27 +352,27 @@ public class AHSWebServices extends WebServices {
     } catch (Exception ex) {
       // directly find logs from HDFS.
       return sendStreamOutputResponse(appId, null, null, containerIdStr,
-          filename, format, length);
+          filename, format, length, false);
     }
     String appOwner = appInfo.getUser();
+    if (isFinishedState(appInfo.getAppState())) {
+      // directly find logs from HDFS.
+      return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
+          filename, format, length, false);
+    }
 
-    ContainerInfo containerInfo;
-    try {
-      containerInfo = super.getContainer(
-          req, res, appId.toString(),
-          containerId.getApplicationAttemptId().toString(),
-          containerId.toString());
-    } catch (Exception ex) {
-      if (isFinishedState(appInfo.getAppState())) {
-        // directly find logs from HDFS.
+    if (isRunningState(appInfo.getAppState())) {
+      ContainerInfo containerInfo;
+      try {
+        containerInfo = super.getContainer(
+            req, res, appId.toString(),
+            containerId.getApplicationAttemptId().toString(),
+            containerId.toString());
+      } catch (Exception ex) {
+        // output the aggregated logs
         return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
-            filename, format, length);
+            filename, format, length, true);
       }
-      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
-          "Can not get ContainerInfo for the container: " + containerId);
-    }
-    String nodeId = containerInfo.getNodeId();
-    if (isRunningState(appInfo.getAppState())) {
       String nodeHttpAddress = containerInfo.getNodeHttpAddress();
       String uri = "/" + containerId.toString() + "/logs/" + filename;
       String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
@@ -392,9 +384,6 @@ public class AHSWebServices extends WebServices {
           HttpServletResponse.SC_TEMPORARY_REDIRECT);
       response.header("Location", resURI);
       return response.build();
-    } else if (isFinishedState(appInfo.getAppState())) {
-      return sendStreamOutputResponse(appId, appOwner, nodeId,
-          containerIdStr, filename, format, length);
     } else {
       return createBadResponse(Status.NOT_FOUND,
           "The application is not at Running or Finished State.");
@@ -419,7 +408,8 @@ public class AHSWebServices extends WebServices {
 
   private Response sendStreamOutputResponse(ApplicationId appId,
       String appOwner, String nodeId, String containerIdStr,
-      String fileName, String format, long bytes) {
+      String fileName, String format, long bytes,
+      boolean printEmptyLocalContainerLog) {
     String contentType = WebAppUtils.getDefaultLogContentType();
     if (format != null && !format.isEmpty()) {
       contentType = WebAppUtils.getSupportedLogContentType(format);
@@ -433,15 +423,11 @@ public class AHSWebServices extends WebServices {
     StreamingOutput stream = null;
     try {
       stream = getStreamingOutput(appId, appOwner, nodeId,
-          containerIdStr, fileName, bytes);
+          containerIdStr, fileName, bytes, printEmptyLocalContainerLog);
     } catch (Exception ex) {
       return createBadResponse(Status.INTERNAL_SERVER_ERROR,
           ex.getMessage());
     }
-    if (stream == null) {
-      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
-          "Can not get log for container: " + containerIdStr);
-    }
     ResponseBuilder response = Response.ok(stream);
     response.header("Content-Type", contentType);
     // Sending the X-Content-Type-Options response header with the value
@@ -451,146 +437,30 @@ public class AHSWebServices extends WebServices {
     return response.build();
   }
 
-  private StreamingOutput getStreamingOutput(ApplicationId appId,
-      String appOwner, final String nodeId, final String containerIdStr,
-      final String logFile, final long bytes) throws IOException{
-    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
-    org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
-        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
-        FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
-    FileContext fc = FileContext.getFileContext(
-        qualifiedRemoteRootLogDir.toUri(), conf);
-    org.apache.hadoop.fs.Path remoteAppDir = null;
-    if (appOwner == null) {
-      org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
-          .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
-      FileStatus[] matching  = fc.util().globStatus(toMatch);
-      if (matching == null || matching.length != 1) {
-        return null;
-      }
-      remoteAppDir = matching[0].getPath();
-    } else {
-      remoteAppDir = LogAggregationUtils
-          .getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix);
-    }
-    final RemoteIterator<FileStatus> nodeFiles;
-    nodeFiles = fc.listStatus(remoteAppDir);
-    if (!nodeFiles.hasNext()) {
-      return null;
-    }
-
+  private StreamingOutput getStreamingOutput(final ApplicationId appId,
+      final String appOwner, final String nodeId, final String containerIdStr,
+      final String logFile, final long bytes,
+      final boolean printEmptyLocalContainerLog) throws IOException{
     StreamingOutput stream = new StreamingOutput() {
 
       @Override
       public void write(OutputStream os) throws IOException,
           WebApplicationException {
         byte[] buf = new byte[65535];
-        boolean findLogs = false;
-        while (nodeFiles.hasNext()) {
-          final FileStatus thisNodeFile = nodeFiles.next();
-          String nodeName = thisNodeFile.getPath().getName();
-          if ((nodeId == null || nodeName.contains(LogAggregationUtils
-              .getNodeString(nodeId))) && !nodeName.endsWith(
-              LogAggregationUtils.TMP_FILE_SUFFIX)) {
-            AggregatedLogFormat.LogReader reader = null;
-            try {
-              reader = new AggregatedLogFormat.LogReader(conf,
-                  thisNodeFile.getPath());
-              DataInputStream valueStream;
-              LogKey key = new LogKey();
-              valueStream = reader.next(key);
-              while (valueStream != null && !key.toString()
-                  .equals(containerIdStr)) {
-                // Next container
-                key = new LogKey();
-                valueStream = reader.next(key);
-              }
-              if (valueStream == null) {
-                continue;
-              }
-              while (true) {
-                try {
-                  String fileType = valueStream.readUTF();
-                  String fileLengthStr = valueStream.readUTF();
-                  long fileLength = Long.parseLong(fileLengthStr);
-                  if (fileType.equalsIgnoreCase(logFile)) {
-                    StringBuilder sb = new StringBuilder();
-                    sb.append("LogType:");
-                    sb.append(fileType + "\n");
-                    sb.append("Log Upload Time:");
-                    sb.append(Times.format(System.currentTimeMillis()) + "\n");
-                    sb.append("LogLength:");
-                    sb.append(fileLengthStr + "\n");
-                    sb.append("Log Contents:\n");
-                    byte[] b = sb.toString().getBytes(
-                        Charset.forName("UTF-8"));
-                    os.write(b, 0, b.length);
-
-                    long toSkip = 0;
-                    long totalBytesToRead = fileLength;
-                    long skipAfterRead = 0;
-                    if (bytes < 0) {
-                      long absBytes = Math.abs(bytes);
-                      if (absBytes < fileLength) {
-                        toSkip = fileLength - absBytes;
-                        totalBytesToRead = absBytes;
-                      }
-                      org.apache.hadoop.io.IOUtils.skipFully(
-                          valueStream, toSkip);
-                    } else {
-                      if (bytes < fileLength) {
-                        totalBytesToRead = bytes;
-                        skipAfterRead = fileLength - bytes;
-                      }
-                    }
-
-                    long curRead = 0;
-                    long pendingRead = totalBytesToRead - curRead;
-                    int toRead = pendingRead > buf.length ? buf.length
-                        : (int) pendingRead;
-                    int len = valueStream.read(buf, 0, toRead);
-                    while (len != -1 && curRead < totalBytesToRead) {
-                      os.write(buf, 0, len);
-                      curRead += len;
-
-                      pendingRead = totalBytesToRead - curRead;
-                      toRead = pendingRead > buf.length ? buf.length
-                          : (int) pendingRead;
-                      len = valueStream.read(buf, 0, toRead);
-                    }
-                    org.apache.hadoop.io.IOUtils.skipFully(
-                        valueStream, skipAfterRead);
-                    sb = new StringBuilder();
-                    sb.append("\nEnd of LogType:" + fileType + "\n");
-                    b = sb.toString().getBytes(Charset.forName("UTF-8"));
-                    os.write(b, 0, b.length);
-                    findLogs = true;
-                  } else {
-                    long totalSkipped = 0;
-                    long currSkipped = 0;
-                    while (currSkipped != -1 && totalSkipped < fileLength) {
-                      currSkipped = valueStream.skip(
-                          fileLength - totalSkipped);
-                      totalSkipped += currSkipped;
-                    }
-                  }
-                } catch (EOFException eof) {
-                  break;
-                }
-              }
-            } finally {
-              if (reader != null) {
-                reader.close();
-              }
-            }
-          }
-        }
-        os.flush();
+        boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
+            appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
         if (!findLogs) {
           throw new IOException("Can not find logs for container:"
               + containerIdStr);
+        } else {
+          if (printEmptyLocalContainerLog) {
+            StringBuilder sb = new StringBuilder();
+            sb.append(containerIdStr + "\n");
+            sb.append("LogType: " + ContainerLogType.LOCAL + "\n");
+            sb.append("LogContents:\n");
+            sb.append(getNoRedirectWarning() + "\n");
+            os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
+          }
         }
       }
     };
@@ -640,4 +510,12 @@ public class AHSWebServices extends WebServices {
       throw new WebApplicationException(ex);
     }
   }
+
+  @Private
+  @VisibleForTesting
+  public static String getNoRedirectWarning() {
+    return "We do not have NodeManager web address, so we can not "
+        + "re-direct the request to related NodeManager "
+        + "for local container logs.";
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
index f553bb0..3d1c901 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
@@ -35,7 +35,7 @@ import javax.servlet.FilterConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -584,7 +584,10 @@ public class TestAHSWebServices extends JerseyTestBase {
     responseText = response.getEntity(String.class);
     assertTrue(responseText.contains("Hello." + containerId1ForApp100));
     int fullTextSize = responseText.getBytes().length;
-    int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length;
+    String tailEndSeparator = StringUtils.repeat("*",
+        "End of LogFile:syslog".length() + 50) + "\n\n";
+    int tailTextSize = "\nEnd of LogFile:syslog\n".getBytes().length
+        + tailEndSeparator.getBytes().length;
 
     String logMessage = "Hello." + containerId1ForApp100;
     int fileContentSize = logMessage.getBytes().length;
@@ -685,6 +688,28 @@ public class TestAHSWebServices extends JerseyTestBase {
     assertTrue(redirectURL.contains(containerId1.toString()));
     assertTrue(redirectURL.contains("/logs/" + fileName));
     assertTrue(redirectURL.contains("user.name=" + user));
+
+    // If we can not container information from ATS, we would try to
+    // get aggregated log from remote FileSystem.
+    ContainerId containerId1000 = ContainerId.newContainerId(
+        appAttemptId, 1000);
+    String content = "Hello." + containerId1000;
+    NodeId nodeId = NodeId.newInstance("test host", 100);
+    TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
+        rootLogDir, containerId1000, nodeId, fileName, user, content, true);
+    r = resource();
+    ClientResponse response = r.path("ws").path("v1")
+        .path("applicationhistory").path("containerlogs")
+        .path(containerId1000.toString()).path(fileName)
+        .queryParam("user.name", user)
+        .accept(MediaType.TEXT_PLAIN)
+        .get(ClientResponse.class);
+    String responseText = response.getEntity(String.class);
+    assertTrue(responseText.contains(content));
+    // Also test whether we output the empty local container log, and give
+    // the warning message.
+    assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
+    assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
   }
 
   @Test(timeout = 10000)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
index 1357d5a..07acd4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -41,6 +42,9 @@ import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.http.JettyUtils;
@@ -57,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
@@ -64,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
+import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.apache.hadoop.yarn.webapp.WebApp;
@@ -74,6 +80,7 @@ import com.google.inject.Singleton;
 @Singleton
 @Path("/ws/v1/node")
 public class NMWebServices {
+  private static final Log LOG = LogFactory.getLog(NMWebServices.class);
   private Context nmContext;
   private ResourceView rview;
   private WebApp webapp;
@@ -330,17 +337,32 @@ public class NMWebServices {
   @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
   @Public
   @Unstable
-  public Response getLogs(@PathParam("containerid") String containerIdStr,
+  public Response getLogs(
+      @PathParam("containerid") final String containerIdStr,
       @PathParam("filename") String filename,
       @QueryParam("format") String format,
       @QueryParam("size") String size) {
-    ContainerId containerId;
+    ContainerId tempContainerId;
     try {
-      containerId = ContainerId.fromString(containerIdStr);
+      tempContainerId = ContainerId.fromString(containerIdStr);
     } catch (IllegalArgumentException ex) {
       return Response.status(Status.BAD_REQUEST).build();
     }
-    
+    final ContainerId containerId = tempContainerId;
+    boolean tempIsRunning = false;
+    // check what is the status for container
+    try {
+      Container container = nmContext.getContainers().get(containerId);
+      tempIsRunning = (container.getContainerState() == ContainerState.RUNNING);
+    } catch (Exception ex) {
+      // This NM does not have this container any more. We
+      // assume the container has already finished.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Can not find the container:" + containerId
+            + " in this node.");
+      }
+    }
+    final boolean isRunning = tempIsRunning;
     File logFile = null;
     try {
       logFile = ContainerLogsUtils.getContainerLogFile(
@@ -351,6 +373,8 @@ public class NMWebServices {
       return Response.serverError().entity(ex.getMessage()).build();
     }
     final long bytes = parseLongParam(size);
+    final String lastModifiedTime = Times.format(logFile.lastModified());
+    final String outputFileName = filename;
     String contentType = WebAppUtils.getDefaultLogContentType();
     if (format != null && !format.isEmpty()) {
       contentType = WebAppUtils.getSupportedLogContentType(format);
@@ -374,39 +398,40 @@ public class NMWebServices {
           try {
             int bufferSize = 65536;
             byte[] buf = new byte[bufferSize];
-            long toSkip = 0;
-            long totalBytesToRead = fileLength;
-            long skipAfterRead = 0;
-            if (bytes < 0) {
-              long absBytes = Math.abs(bytes);
-              if (absBytes < fileLength) {
-                toSkip = fileLength - absBytes;
-                totalBytesToRead = absBytes;
-              }
-              org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
+            LogToolUtils.outputContainerLog(containerId.toString(),
+                nmContext.getNodeId().toString(), outputFileName, fileLength,
+                bytes, lastModifiedTime, fis, os, buf, ContainerLogType.LOCAL);
+            StringBuilder sb = new StringBuilder();
+            String endOfFile = "End of LogFile:" + outputFileName;
+            sb.append(endOfFile + ".");
+            if (isRunning) {
+              sb.append("This log file belongs to a running container ("
+                  + containerIdStr + ") and so may not be complete." + "\n");
             } else {
-              if (bytes < fileLength) {
-                totalBytesToRead = bytes;
-                skipAfterRead = fileLength - bytes;
-              }
+              sb.append("\n");
             }
-
-            long curRead = 0;
-            long pendingRead = totalBytesToRead - curRead;
-            int toRead = pendingRead > buf.length ? buf.length
-                : (int) pendingRead;
-            int len = fis.read(buf, 0, toRead);
-            while (len != -1 && curRead < totalBytesToRead) {
-              os.write(buf, 0, len);
-              curRead += len;
-
-              pendingRead = totalBytesToRead - curRead;
-              toRead = pendingRead > buf.length ? buf.length
-                  : (int) pendingRead;
-              len = fis.read(buf, 0, toRead);
+            sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+                + "\n\n");
+            os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
+            // If we have aggregated logs for this container,
+            // output the aggregation logs as well.
+            ApplicationId appId = containerId.getApplicationAttemptId()
+                .getApplicationId();
+            Application app = nmContext.getApplications().get(appId);
+            String appOwner = app == null ? null : app.getUser();
+            try {
+              LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
+                  appId, appOwner, containerId.toString(),
+                  nmContext.getNodeId().toString(), outputFileName, bytes,
+                  os, buf);
+            } catch (Exception ex) {
+              // Something wrong when we try to access the aggregated log.
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Can not access the aggregated log for "
+                    + "the container:" + containerId);
+                LOG.debug(ex.getMessage());
+              }
             }
-            org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
-            os.flush();
           } finally {
             IOUtils.closeQuietly(fis);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/327c9980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index a6d4153..7764ceb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -384,8 +384,9 @@ public class TestNMWebServices extends JerseyTestBase {
     ClientResponse response = r.path(filename)
         .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
     String responseText = response.getEntity(String.class);
-    assertEquals(logMessage, responseText);
-    int fullTextSize = responseText.getBytes().length;
+    String responseLogMessage = getLogContext(responseText);
+    assertEquals(logMessage, responseLogMessage);
+    int fullTextSize = responseLogMessage.getBytes().length;
 
     // specify how many bytes we should get from logs
     // specify a position number, it would get the first n bytes from
@@ -394,9 +395,10 @@ public class TestNMWebServices extends JerseyTestBase {
         .queryParam("size", "5")
         .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
     responseText = response.getEntity(String.class);
-    assertEquals(5, responseText.getBytes().length);
-    assertEquals(new String(logMessage.getBytes(), 0, 5), responseText);
-    assertTrue(fullTextSize >= responseText.getBytes().length);
+    responseLogMessage = getLogContext(responseText);
+    assertEquals(5, responseLogMessage.getBytes().length);
+    assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage);
+    assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
 
     // specify the bytes which is larger than the actual file size,
     // we would get the full logs
@@ -404,8 +406,9 @@ public class TestNMWebServices extends JerseyTestBase {
         .queryParam("size", "10000")
         .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
     responseText = response.getEntity(String.class);
-    assertEquals(fullTextSize, responseText.getBytes().length);
-    assertEquals(logMessage, responseText);
+    responseLogMessage = getLogContext(responseText);
+    assertEquals(fullTextSize, responseLogMessage.getBytes().length);
+    assertEquals(logMessage, responseLogMessage);
 
     // specify a negative number, it would get the last n bytes from
     // container log
@@ -413,25 +416,28 @@ public class TestNMWebServices extends JerseyTestBase {
         .queryParam("size", "-5")
         .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
     responseText = response.getEntity(String.class);
-    assertEquals(5, responseText.getBytes().length);
+    responseLogMessage = getLogContext(responseText);
+    assertEquals(5, responseLogMessage.getBytes().length);
     assertEquals(new String(logMessage.getBytes(),
-        logMessage.getBytes().length - 5, 5), responseText);
-    assertTrue(fullTextSize >= responseText.getBytes().length);
+        logMessage.getBytes().length - 5, 5), responseLogMessage);
+    assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
 
     response = r.path(filename)
         .queryParam("size", "-10000")
         .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
     responseText = response.getEntity(String.class);
+    responseLogMessage = getLogContext(responseText);
     assertEquals("text/plain; charset=utf-8", response.getType().toString());
-    assertEquals(fullTextSize, responseText.getBytes().length);
-    assertEquals(logMessage, responseText);
+    assertEquals(fullTextSize, responseLogMessage.getBytes().length);
+    assertEquals(logMessage, responseLogMessage);
 
     // ask and download it
     response = r.path(filename)
         .queryParam("format", "octet-stream")
         .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
     responseText = response.getEntity(String.class);
-    assertEquals(logMessage, responseText);
+    responseLogMessage = getLogContext(responseText);
+    assertEquals(logMessage, responseLogMessage);
     assertEquals(200, response.getStatus());
     assertEquals("application/octet-stream; charset=utf-8",
         response.getType().toString());
@@ -475,10 +481,11 @@ public class TestNMWebServices extends JerseyTestBase {
         TestNMWebServices.class.getSimpleName() + "temp-log-dir");
     try {
       String aggregatedLogFile = filename + "-aggregated";
+      String aggregatedLogMessage = "This is aggregated ;og.";
       TestContainerLogsUtils.createContainerLogFileInRemoteFS(
           nmContext.getConf(), FileSystem.get(nmContext.getConf()),
           tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
-          aggregatedLogFile, "user", logMessage, true);
+          aggregatedLogFile, "user", aggregatedLogMessage, true);
       r1 = resource();
       response = r1.path("ws").path("v1").path("node")
           .path("containers").path(containerIdStr)
@@ -501,6 +508,21 @@ public class TestNMWebServices extends JerseyTestBase {
           assertEquals(meta.get(0).getFileName(), filename);
         }
       }
+
+      // Test whether we could get aggregated log as well
+      TestContainerLogsUtils.createContainerLogFileInRemoteFS(
+          nmContext.getConf(), FileSystem.get(nmContext.getConf()),
+          tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
+          filename, "user", aggregatedLogMessage, true);
+      response = r.path(filename)
+          .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
+      responseText = response.getEntity(String.class);
+      assertTrue(responseText.contains("LogType: "
+          + ContainerLogType.AGGREGATED));
+      assertTrue(responseText.contains(aggregatedLogMessage));
+      assertTrue(responseText.contains("LogType: "
+              + ContainerLogType.LOCAL));
+      assertTrue(responseText.contains(logMessage));
     } finally {
       FileUtil.fullyDelete(tempLogDir);
     }
@@ -511,7 +533,7 @@ public class TestNMWebServices extends JerseyTestBase {
         r.path(filename).accept(MediaType.TEXT_PLAIN)
             .get(ClientResponse.class);
     responseText = response.getEntity(String.class);
-    assertEquals(logMessage, responseText);
+    assertTrue(responseText.contains(logMessage));
   }
 
   public void verifyNodesXML(NodeList nodes) throws JSONException, Exception {
@@ -601,4 +623,11 @@ public class TestNMWebServices extends JerseyTestBase {
         YarnVersionInfo.getVersion(), resourceManagerVersion);
   }
 
+  private String getLogContext(String fullMessage) {
+    String prefix = "LogContents:\n";
+    String postfix = "End of LogFile:";
+    int prefixIndex = fullMessage.indexOf(prefix) + prefix.length();
+    int postfixIndex = fullMessage.indexOf(postfix);
+    return fullMessage.substring(prefixIndex, postfixIndex);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org