You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/04 13:08:52 UTC

[3/9] git commit: YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature that were caused when adding log-upload-time via YARN-2703. Contributed by Xuan Gong.

YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature that were caused when adding log-upload-time via YARN-2703. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58e9f24e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58e9f24e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58e9f24e

Branch: refs/heads/HDFS-EC
Commit: 58e9f24e0f06efede21085b7ffe36af042fa7b38
Parents: 71fbb47
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Mon Nov 3 13:16:29 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Mon Nov 3 13:16:29 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   4 +
 .../hadoop/yarn/client/cli/TestLogsCLI.java     |  58 ++++++++-
 .../logaggregation/AggregatedLogFormat.java     | 124 ++++++++++---------
 .../yarn/logaggregation/LogCLIHelpers.java      |  20 ++-
 .../yarn/webapp/log/AggregatedLogsBlock.java    |   8 +-
 .../logaggregation/TestAggregatedLogFormat.java |  26 +++-
 .../TestLogAggregationService.java              |  15 +--
 7 files changed, 172 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c3ee0b0..9566458 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -845,6 +845,10 @@ Release 2.6.0 - UNRELEASED
     YARN-2798. Fixed YarnClient to populate the renewer correctly for Timeline
     delegation tokens. (Zhijie Shen via vinodkv)
 
+    YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature
+    that were caused when adding log-upload-time via YARN-2703. (Xuan Gong via
+    vinodkv)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index 2e9e92d..5ed8398 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -33,6 +34,7 @@ import java.io.PrintWriter;
 import java.io.Writer;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -170,9 +172,9 @@ public class TestLogsCLI {
     ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptIdPBImpl.newInstance(appId, 1);
+    ContainerId containerId0 = ContainerIdPBImpl.newInstance(appAttemptId, 0);
     ContainerId containerId1 = ContainerIdPBImpl.newInstance(appAttemptId, 1);
     ContainerId containerId2 = ContainerIdPBImpl.newInstance(appAttemptId, 2);
-
     NodeId nodeId = NodeId.newInstance("localhost", 1234);
 
     // create local logs
@@ -201,7 +203,15 @@ public class TestLogsCLI {
       fs.delete(path, true);
     }
     assertTrue(fs.mkdirs(path));
+
     // upload container logs into remote directory
+    // the first two logs is empty. When we try to read first two logs,
+    // we will meet EOF exception, but it will not impact other logs.
+    // Other logs should be read successfully.
+    uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
+      containerId0, path, fs);
+    uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
+      containerId1, path, fs);
     uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
       containerId1, path, fs);
     uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
@@ -220,6 +230,9 @@ public class TestLogsCLI {
       "Hello container_0_0001_01_000002!"));
     sysOutStream.reset();
 
+    // uploaded two logs for container1. The first log is empty.
+    // The second one is not empty.
+    // We can still successfully read logs for container1.
     exitCode =
         cli.run(new String[] { "-applicationId", appId.toString(),
             "-nodeAddress", nodeId.toString(), "-containerId",
@@ -227,7 +240,23 @@ public class TestLogsCLI {
     assertTrue(exitCode == 0);
     assertTrue(sysOutStream.toString().contains(
         "Hello container_0_0001_01_000001!"));
-    assertTrue(sysOutStream.toString().contains("LogUploadTime"));
+    assertTrue(sysOutStream.toString().contains("Log Upload Time"));
+    assertTrue(!sysOutStream.toString().contains(
+      "Logs for container " + containerId1.toString()
+          + " are not present in this log-file."));
+    sysOutStream.reset();
+
+    // Uploaded the empty log for container0.
+    // We should see the message showing the log for container0
+    // are not present.
+    exitCode =
+        cli.run(new String[] { "-applicationId", appId.toString(),
+            "-nodeAddress", nodeId.toString(), "-containerId",
+            containerId0.toString() });
+    assertTrue(exitCode == -1);
+    assertTrue(sysOutStream.toString().contains(
+      "Logs for container " + containerId0.toString()
+          + " are not present in this log-file."));
 
     fs.delete(new Path(remoteLogRootDir), true);
     fs.delete(new Path(rootLogDir), true);
@@ -266,6 +295,31 @@ public class TestLogsCLI {
     writer.close();
   }
 
+  private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
+      Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
+      ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
+    Path path =
+        new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+            + System.currentTimeMillis());
+    AggregatedLogFormat.LogWriter writer =
+        new AggregatedLogFormat.LogWriter(configuration, path, ugi);
+    writer.writeApplicationOwner(ugi.getUserName());
+
+    Map<ApplicationAccessType, String> appAcls =
+        new HashMap<ApplicationAccessType, String>();
+    appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+    writer.writeApplicationACLs(appAcls);
+    DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
+    new AggregatedLogFormat.LogKey(containerId).write(out);
+    out.close();
+    out = writer.getWriter().prepareAppendValue(-1);
+    new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+      UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
+      new HashSet<File>());
+    out.close();
+    writer.close();
+  }
+
   private YarnClient createMockYarnClient(YarnApplicationState appState)
       throws YarnException, IOException {
     YarnClient mockClient = mock(YarnClient.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 22219be..a434ef5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -27,6 +27,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Writer;
 import java.security.PrivilegedExceptionAction;
@@ -44,6 +45,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.commons.io.output.WriterOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -233,9 +235,6 @@ public class AggregatedLogFormat {
         // Write the logFile Type
         out.writeUTF(logFile.getName());
 
-        // Write the uploaded TimeStamp
-        out.writeLong(System.currentTimeMillis());
-
         // Write the log length as UTF so that it is printable
         out.writeUTF(String.valueOf(fileLength));
 
@@ -400,6 +399,11 @@ public class AggregatedLogFormat {
       writeVersion();
     }
 
+    @VisibleForTesting
+    public TFile.Writer getWriter() {
+      return this.writer;
+    }
+
     private void writeVersion() throws IOException {
       DataOutputStream out = this.writer.prepareAppendKey(-1);
       VERSION_KEY.write(out);
@@ -639,70 +643,55 @@ public class AggregatedLogFormat {
      * Writes all logs for a single container to the provided writer.
      * @param valueStream
      * @param writer
+     * @param logUploadedTime
      * @throws IOException
      */
     public static void readAcontainerLogs(DataInputStream valueStream,
-        Writer writer) throws IOException {
-      int bufferSize = 65536;
-      char[] cbuf = new char[bufferSize];
-      String fileType;
-      long uploadTime;
-      String fileLengthStr;
-      long fileLength;
-
-      while (true) {
-        try {
-          fileType = valueStream.readUTF();
-        } catch (EOFException e) {
-          // EndOfFile
-          return;
-        }
-        uploadTime = valueStream.readLong();
-        fileLengthStr = valueStream.readUTF();
-        fileLength = Long.parseLong(fileLengthStr);
-        writer.write("\n\nLogType:");
-        writer.write(fileType);
-        writer.write("\nLogUploadTime:");
-        writer.write(String.valueOf(uploadTime));
-        writer.write("\nLogLength:");
-        writer.write(fileLengthStr);
-        writer.write("\nLog Contents:\n");
-        // ByteLevel
-        BoundedInputStream bis =
-            new BoundedInputStream(valueStream, fileLength);
-        InputStreamReader reader = new InputStreamReader(bis);
-        int currentRead = 0;
-        int totalRead = 0;
-        while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
-          writer.write(cbuf, 0, currentRead);
-          totalRead += currentRead;
+        Writer writer, long logUploadedTime) throws IOException {
+      OutputStream os = null;
+      PrintStream ps = null;
+      try {
+        os = new WriterOutputStream(writer);
+        ps = new PrintStream(os);
+        while (true) {
+          try {
+            readContainerLogs(valueStream, ps, logUploadedTime);
+          } catch (EOFException e) {
+            // EndOfFile
+            return;
+          }
         }
+      } finally {
+        IOUtils.cleanup(LOG, ps);
+        IOUtils.cleanup(LOG, os);
       }
     }
 
     /**
-     * Keep calling this till you get a {@link EOFException} for getting logs of
-     * all types for a single container.
-     * 
+     * Writes all logs for a single container to the provided writer.
      * @param valueStream
-     * @param out
+     * @param writer
      * @throws IOException
      */
-    public static void readAContainerLogsForALogType(
-        DataInputStream valueStream, PrintStream out)
-          throws IOException {
+    public static void readAcontainerLogs(DataInputStream valueStream,
+        Writer writer) throws IOException {
+      readAcontainerLogs(valueStream, writer, -1);
+    }
 
+    private static void readContainerLogs(DataInputStream valueStream,
+        PrintStream out, long logUploadedTime) throws IOException {
       byte[] buf = new byte[65535];
 
       String fileType = valueStream.readUTF();
-      long uploadTime = valueStream.readLong();
       String fileLengthStr = valueStream.readUTF();
       long fileLength = Long.parseLong(fileLengthStr);
-      out.print("LogType: ");
+      out.print("LogType:");
       out.println(fileType);
-      out.print("LogUploadTime: ");
-      out.println(Times.format(uploadTime));
-      out.print("LogLength: ");
+      if (logUploadedTime != -1) {
+        out.print("Log Upload Time:");
+        out.println(Times.format(logUploadedTime));
+      }
+      out.print("LogLength:");
       out.println(fileLengthStr);
       out.println("Log Contents:");
 
@@ -723,6 +712,35 @@ public class AggregatedLogFormat {
       out.println("");
     }
 
+    /**
+     * Keep calling this till you get a {@link EOFException} for getting logs of
+     * all types for a single container.
+     * 
+     * @param valueStream
+     * @param out
+     * @param logUploadedTime
+     * @throws IOException
+     */
+    public static void readAContainerLogsForALogType(
+        DataInputStream valueStream, PrintStream out, long logUploadedTime)
+          throws IOException {
+      readContainerLogs(valueStream, out, logUploadedTime);
+    }
+
+    /**
+     * Keep calling this till you get a {@link EOFException} for getting logs of
+     * all types for a single container.
+     * 
+     * @param valueStream
+     * @param out
+     * @throws IOException
+     */
+    public static void readAContainerLogsForALogType(
+        DataInputStream valueStream, PrintStream out)
+          throws IOException {
+      readAContainerLogsForALogType(valueStream, out, -1);
+    }
+
     public void close() {
       IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
     }
@@ -732,7 +750,6 @@ public class AggregatedLogFormat {
   public static class ContainerLogsReader {
     private DataInputStream valueStream;
     private String currentLogType = null;
-    private long currentLogUpLoadTime = 0;
     private long currentLogLength = 0;
     private BoundedInputStream currentLogData = null;
     private InputStreamReader currentLogISR;
@@ -753,14 +770,12 @@ public class AggregatedLogFormat {
       }
 
       currentLogType = null;
-      currentLogUpLoadTime = 0;
       currentLogLength = 0;
       currentLogData = null;
       currentLogISR = null;
 
       try {
         String logType = valueStream.readUTF();
-        long logUpLoadTime = valueStream.readLong();
         String logLengthStr = valueStream.readUTF();
         currentLogLength = Long.parseLong(logLengthStr);
         currentLogData =
@@ -768,7 +783,6 @@ public class AggregatedLogFormat {
         currentLogData.setPropagateClose(false);
         currentLogISR = new InputStreamReader(currentLogData);
         currentLogType = logType;
-        currentLogUpLoadTime = logUpLoadTime;
       } catch (EOFException e) {
       }
 
@@ -779,10 +793,6 @@ public class AggregatedLogFormat {
       return currentLogType;
     }
 
-    public long getCurrentLogUpLoadTime() {
-      return currentLogUpLoadTime;
-    }
-
     public long getCurrentLogLength() {
       return currentLogLength;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index de06d48..1546ece 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -78,7 +78,8 @@ public class LogCLIHelpers implements Configurable {
           reader =
               new AggregatedLogFormat.LogReader(getConf(),
                 thisNodeFile.getPath());
-          if (dumpAContainerLogs(containerId, reader, System.out) > -1) {
+          if (dumpAContainerLogs(containerId, reader, System.out,
+              thisNodeFile.getModificationTime()) > -1) {
             foundContainerLogs = true;
           }
         } finally {
@@ -97,7 +98,8 @@ public class LogCLIHelpers implements Configurable {
 
   @Private
   public int dumpAContainerLogs(String containerIdStr,
-      AggregatedLogFormat.LogReader reader, PrintStream out) throws IOException {
+      AggregatedLogFormat.LogReader reader, PrintStream out,
+      long logUploadedTime) throws IOException {
     DataInputStream valueStream;
     LogKey key = new LogKey();
     valueStream = reader.next(key);
@@ -112,14 +114,20 @@ public class LogCLIHelpers implements Configurable {
       return -1;
     }
 
+    boolean foundContainerLogs = false;
     while (true) {
       try {
-        LogReader.readAContainerLogsForALogType(valueStream, out);
+        LogReader.readAContainerLogsForALogType(valueStream, out,
+          logUploadedTime);
+        foundContainerLogs = true;
       } catch (EOFException eof) {
         break;
       }
     }
-    return 0;
+    if (foundContainerLogs) {
+      return 0;
+    }
+    return -1;
   }
 
   @Private
@@ -157,13 +165,15 @@ public class LogCLIHelpers implements Configurable {
           valueStream = reader.next(key);
 
           while (valueStream != null) {
+
             String containerString =
                 "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
             out.println(containerString);
             out.println(StringUtils.repeat("=", containerString.length()));
             while (true) {
               try {
-                LogReader.readAContainerLogsForALogType(valueStream, out);
+                LogReader.readAContainerLogsForALogType(valueStream, out,
+                  thisNodeFile.getModificationTime());
                 foundAnyLogs = true;
               } catch (EOFException eof) {
                 break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
index bba3258..3e9f7a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
@@ -126,6 +126,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
                 .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
             continue;
           }
+          long logUploadedTime = thisNodeFile.getModificationTime();
           reader =
               new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
 
@@ -164,7 +165,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
           }
 
           foundLog = readContainerLogs(html, logReader, logLimits,
-              desiredLogType);
+              desiredLogType, logUploadedTime);
         } catch (IOException ex) {
           LOG.error("Error getting logs for " + logEntity, ex);
           continue;
@@ -189,7 +190,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
 
   private boolean readContainerLogs(Block html,
       AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
-      String desiredLogType) throws IOException {
+      String desiredLogType, long logUpLoadTime) throws IOException {
     int bufferSize = 65536;
     char[] cbuf = new char[bufferSize];
 
@@ -199,13 +200,12 @@ public class AggregatedLogsBlock extends HtmlBlock {
       if (desiredLogType == null || desiredLogType.isEmpty()
           || desiredLogType.equals(logType)) {
         long logLength = logReader.getCurrentLogLength();
-        long logUpLoadTime = logReader.getCurrentLogUpLoadTime();
         if (foundLog) {
           html.pre()._("\n\n")._();
         }
 
         html.p()._("Log Type: " + logType)._();
-        html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._();
+        html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._();
         html.p()._("Log Length: " + Long.toString(logLength))._();
 
         long start = logLimits.start < 0

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
index bc0485e..405cb3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.util.Times;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -178,9 +179,16 @@ public class TestAggregatedLogFormat {
     logWriter.close();
   }
 
-  //Verify the output generated by readAContainerLogs(DataInputStream, Writer)
   @Test
   public void testReadAcontainerLogs1() throws Exception {
+    //Verify the output generated by readAContainerLogs(DataInputStream, Writer, logUploadedTime)
+    testReadAcontainerLog(true);
+
+    //Verify the output generated by readAContainerLogs(DataInputStream, Writer)
+    testReadAcontainerLog(false);
+  }
+
+  private void testReadAcontainerLog(boolean logUploadedTime) throws Exception {
     Configuration conf = new Configuration();
     File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
     Path remoteAppLogFile =
@@ -233,17 +241,23 @@ public class TestAggregatedLogFormat {
     LogKey rLogKey = new LogKey();
     DataInputStream dis = logReader.next(rLogKey);
     Writer writer = new StringWriter();
-    LogReader.readAcontainerLogs(dis, writer);
-    
+
+    if (logUploadedTime) {
+      LogReader.readAcontainerLogs(dis, writer, System.currentTimeMillis());
+    } else {
+      LogReader.readAcontainerLogs(dis, writer);
+    }
+
     // We should only do the log aggregation for stdout.
     // Since we could not open the fileInputStream for stderr, this file is not
     // aggregated.
     String s = writer.toString();
     int expectedLength =
-        "\n\nLogType:stdout".length()
-            + ("\nLogUploadTime:" + System.currentTimeMillis()).length()
+        "LogType:stdout".length()
+            + (logUploadedTime ? ("\nLog Upload Time:" + Times.format(System
+              .currentTimeMillis())).length() : 0)
             + ("\nLogLength:" + numChars).length()
-            + "\nLog Contents:\n".length() + numChars;
+            + "\nLog Contents:\n".length() + numChars + "\n".length();
     Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
     Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
     Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 419de88..7d911e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -767,30 +767,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
           try {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             PrintStream ps = new PrintStream(baos);
+
             LogReader.readAContainerLogsForALogType(valueStream, ps);
 
             String writtenLines[] = baos.toString().split(
               System.getProperty("line.separator"));
 
             Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
-            String fileType = writtenLines[0].substring(9);
-
-            Assert.assertEquals("LogUploadTime:", writtenLines[1].substring(0, 14));
-            String fileUploadedTimeStr = writtenLines[1].substring(15);
+            String fileType = writtenLines[0].substring(8);
 
-            Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10));
-            String fileLengthStr = writtenLines[2].substring(11);
+            Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
+            String fileLengthStr = writtenLines[1].substring(10);
             long fileLength = Long.parseLong(fileLengthStr);
 
             Assert.assertEquals("Log Contents:",
-              writtenLines[3].substring(0, 13));
+              writtenLines[2].substring(0, 13));
 
             String logContents = StringUtils.join(
-              Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n");
+              Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n");
             perContainerMap.put(fileType, logContents);
 
             LOG.info("LogType:" + fileType);
-            LOG.info("LogUploadTime:" + fileUploadedTimeStr);
             LOG.info("LogLength:" + fileLength);
             LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
           } catch (EOFException eof) {