You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/03 22:17:14 UTC
git commit: YARN-2788. Fixed backwards compatiblity issues with
log-aggregation feature that were caused when adding log-upload-time via
YARN-2703. Contributed by Xuan Gong.
Repository: hadoop
Updated Branches:
refs/heads/trunk 71fbb474f -> 58e9f24e0
YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature that were caused when adding log-upload-time via YARN-2703. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58e9f24e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58e9f24e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58e9f24e
Branch: refs/heads/trunk
Commit: 58e9f24e0f06efede21085b7ffe36af042fa7b38
Parents: 71fbb47
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Mon Nov 3 13:16:29 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Mon Nov 3 13:16:29 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 4 +
.../hadoop/yarn/client/cli/TestLogsCLI.java | 58 ++++++++-
.../logaggregation/AggregatedLogFormat.java | 124 ++++++++++---------
.../yarn/logaggregation/LogCLIHelpers.java | 20 ++-
.../yarn/webapp/log/AggregatedLogsBlock.java | 8 +-
.../logaggregation/TestAggregatedLogFormat.java | 26 +++-
.../TestLogAggregationService.java | 15 +--
7 files changed, 172 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c3ee0b0..9566458 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -845,6 +845,10 @@ Release 2.6.0 - UNRELEASED
YARN-2798. Fixed YarnClient to populate the renewer correctly for Timeline
delegation tokens. (Zhijie Shen via vinodkv)
+ YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature
+ that were caused when adding log-upload-time via YARN-2703. (Xuan Gong via
+ vinodkv)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index 2e9e92d..5ed8398 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -33,6 +34,7 @@ import java.io.PrintWriter;
import java.io.Writer;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -170,9 +172,9 @@ public class TestLogsCLI {
ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptIdPBImpl.newInstance(appId, 1);
+ ContainerId containerId0 = ContainerIdPBImpl.newInstance(appAttemptId, 0);
ContainerId containerId1 = ContainerIdPBImpl.newInstance(appAttemptId, 1);
ContainerId containerId2 = ContainerIdPBImpl.newInstance(appAttemptId, 2);
-
NodeId nodeId = NodeId.newInstance("localhost", 1234);
// create local logs
@@ -201,7 +203,15 @@ public class TestLogsCLI {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
+
// upload container logs into remote directory
+ // the first two logs is empty. When we try to read first two logs,
+ // we will meet EOF exception, but it will not impact other logs.
+ // Other logs should be read successfully.
+ uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
+ containerId0, path, fs);
+ uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
+ containerId1, path, fs);
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
containerId1, path, fs);
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
@@ -220,6 +230,9 @@ public class TestLogsCLI {
"Hello container_0_0001_01_000002!"));
sysOutStream.reset();
+ // uploaded two logs for container1. The first log is empty.
+ // The second one is not empty.
+ // We can still successfully read logs for container1.
exitCode =
cli.run(new String[] { "-applicationId", appId.toString(),
"-nodeAddress", nodeId.toString(), "-containerId",
@@ -227,7 +240,23 @@ public class TestLogsCLI {
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toString().contains(
"Hello container_0_0001_01_000001!"));
- assertTrue(sysOutStream.toString().contains("LogUploadTime"));
+ assertTrue(sysOutStream.toString().contains("Log Upload Time"));
+ assertTrue(!sysOutStream.toString().contains(
+ "Logs for container " + containerId1.toString()
+ + " are not present in this log-file."));
+ sysOutStream.reset();
+
+ // Uploaded the empty log for container0.
+ // We should see the message showing the log for container0
+ // are not present.
+ exitCode =
+ cli.run(new String[] { "-applicationId", appId.toString(),
+ "-nodeAddress", nodeId.toString(), "-containerId",
+ containerId0.toString() });
+ assertTrue(exitCode == -1);
+ assertTrue(sysOutStream.toString().contains(
+ "Logs for container " + containerId0.toString()
+ + " are not present in this log-file."));
fs.delete(new Path(remoteLogRootDir), true);
fs.delete(new Path(rootLogDir), true);
@@ -266,6 +295,31 @@ public class TestLogsCLI {
writer.close();
}
+ private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
+ Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
+ ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
+ Path path =
+ new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+ + System.currentTimeMillis());
+ AggregatedLogFormat.LogWriter writer =
+ new AggregatedLogFormat.LogWriter(configuration, path, ugi);
+ writer.writeApplicationOwner(ugi.getUserName());
+
+ Map<ApplicationAccessType, String> appAcls =
+ new HashMap<ApplicationAccessType, String>();
+ appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+ writer.writeApplicationACLs(appAcls);
+ DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
+ new AggregatedLogFormat.LogKey(containerId).write(out);
+ out.close();
+ out = writer.getWriter().prepareAppendValue(-1);
+ new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+ UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
+ new HashSet<File>());
+ out.close();
+ writer.close();
+ }
+
private YarnClient createMockYarnClient(YarnApplicationState appState)
throws YarnException, IOException {
YarnClient mockClient = mock(YarnClient.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 22219be..a434ef5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -27,6 +27,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Writer;
import java.security.PrivilegedExceptionAction;
@@ -44,6 +45,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -233,9 +235,6 @@ public class AggregatedLogFormat {
// Write the logFile Type
out.writeUTF(logFile.getName());
- // Write the uploaded TimeStamp
- out.writeLong(System.currentTimeMillis());
-
// Write the log length as UTF so that it is printable
out.writeUTF(String.valueOf(fileLength));
@@ -400,6 +399,11 @@ public class AggregatedLogFormat {
writeVersion();
}
+ @VisibleForTesting
+ public TFile.Writer getWriter() {
+ return this.writer;
+ }
+
private void writeVersion() throws IOException {
DataOutputStream out = this.writer.prepareAppendKey(-1);
VERSION_KEY.write(out);
@@ -639,70 +643,55 @@ public class AggregatedLogFormat {
* Writes all logs for a single container to the provided writer.
* @param valueStream
* @param writer
+ * @param logUploadedTime
* @throws IOException
*/
public static void readAcontainerLogs(DataInputStream valueStream,
- Writer writer) throws IOException {
- int bufferSize = 65536;
- char[] cbuf = new char[bufferSize];
- String fileType;
- long uploadTime;
- String fileLengthStr;
- long fileLength;
-
- while (true) {
- try {
- fileType = valueStream.readUTF();
- } catch (EOFException e) {
- // EndOfFile
- return;
- }
- uploadTime = valueStream.readLong();
- fileLengthStr = valueStream.readUTF();
- fileLength = Long.parseLong(fileLengthStr);
- writer.write("\n\nLogType:");
- writer.write(fileType);
- writer.write("\nLogUploadTime:");
- writer.write(String.valueOf(uploadTime));
- writer.write("\nLogLength:");
- writer.write(fileLengthStr);
- writer.write("\nLog Contents:\n");
- // ByteLevel
- BoundedInputStream bis =
- new BoundedInputStream(valueStream, fileLength);
- InputStreamReader reader = new InputStreamReader(bis);
- int currentRead = 0;
- int totalRead = 0;
- while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
- writer.write(cbuf, 0, currentRead);
- totalRead += currentRead;
+ Writer writer, long logUploadedTime) throws IOException {
+ OutputStream os = null;
+ PrintStream ps = null;
+ try {
+ os = new WriterOutputStream(writer);
+ ps = new PrintStream(os);
+ while (true) {
+ try {
+ readContainerLogs(valueStream, ps, logUploadedTime);
+ } catch (EOFException e) {
+ // EndOfFile
+ return;
+ }
}
+ } finally {
+ IOUtils.cleanup(LOG, ps);
+ IOUtils.cleanup(LOG, os);
}
}
/**
- * Keep calling this till you get a {@link EOFException} for getting logs of
- * all types for a single container.
- *
+ * Writes all logs for a single container to the provided writer.
* @param valueStream
- * @param out
+ * @param writer
* @throws IOException
*/
- public static void readAContainerLogsForALogType(
- DataInputStream valueStream, PrintStream out)
- throws IOException {
+ public static void readAcontainerLogs(DataInputStream valueStream,
+ Writer writer) throws IOException {
+ readAcontainerLogs(valueStream, writer, -1);
+ }
+ private static void readContainerLogs(DataInputStream valueStream,
+ PrintStream out, long logUploadedTime) throws IOException {
byte[] buf = new byte[65535];
String fileType = valueStream.readUTF();
- long uploadTime = valueStream.readLong();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
- out.print("LogType: ");
+ out.print("LogType:");
out.println(fileType);
- out.print("LogUploadTime: ");
- out.println(Times.format(uploadTime));
- out.print("LogLength: ");
+ if (logUploadedTime != -1) {
+ out.print("Log Upload Time:");
+ out.println(Times.format(logUploadedTime));
+ }
+ out.print("LogLength:");
out.println(fileLengthStr);
out.println("Log Contents:");
@@ -723,6 +712,35 @@ public class AggregatedLogFormat {
out.println("");
}
+ /**
+ * Keep calling this till you get a {@link EOFException} for getting logs of
+ * all types for a single container.
+ *
+ * @param valueStream
+ * @param out
+ * @param logUploadedTime
+ * @throws IOException
+ */
+ public static void readAContainerLogsForALogType(
+ DataInputStream valueStream, PrintStream out, long logUploadedTime)
+ throws IOException {
+ readContainerLogs(valueStream, out, logUploadedTime);
+ }
+
+ /**
+ * Keep calling this till you get a {@link EOFException} for getting logs of
+ * all types for a single container.
+ *
+ * @param valueStream
+ * @param out
+ * @throws IOException
+ */
+ public static void readAContainerLogsForALogType(
+ DataInputStream valueStream, PrintStream out)
+ throws IOException {
+ readAContainerLogsForALogType(valueStream, out, -1);
+ }
+
public void close() {
IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
}
@@ -732,7 +750,6 @@ public class AggregatedLogFormat {
public static class ContainerLogsReader {
private DataInputStream valueStream;
private String currentLogType = null;
- private long currentLogUpLoadTime = 0;
private long currentLogLength = 0;
private BoundedInputStream currentLogData = null;
private InputStreamReader currentLogISR;
@@ -753,14 +770,12 @@ public class AggregatedLogFormat {
}
currentLogType = null;
- currentLogUpLoadTime = 0;
currentLogLength = 0;
currentLogData = null;
currentLogISR = null;
try {
String logType = valueStream.readUTF();
- long logUpLoadTime = valueStream.readLong();
String logLengthStr = valueStream.readUTF();
currentLogLength = Long.parseLong(logLengthStr);
currentLogData =
@@ -768,7 +783,6 @@ public class AggregatedLogFormat {
currentLogData.setPropagateClose(false);
currentLogISR = new InputStreamReader(currentLogData);
currentLogType = logType;
- currentLogUpLoadTime = logUpLoadTime;
} catch (EOFException e) {
}
@@ -779,10 +793,6 @@ public class AggregatedLogFormat {
return currentLogType;
}
- public long getCurrentLogUpLoadTime() {
- return currentLogUpLoadTime;
- }
-
public long getCurrentLogLength() {
return currentLogLength;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index de06d48..1546ece 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -78,7 +78,8 @@ public class LogCLIHelpers implements Configurable {
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
- if (dumpAContainerLogs(containerId, reader, System.out) > -1) {
+ if (dumpAContainerLogs(containerId, reader, System.out,
+ thisNodeFile.getModificationTime()) > -1) {
foundContainerLogs = true;
}
} finally {
@@ -97,7 +98,8 @@ public class LogCLIHelpers implements Configurable {
@Private
public int dumpAContainerLogs(String containerIdStr,
- AggregatedLogFormat.LogReader reader, PrintStream out) throws IOException {
+ AggregatedLogFormat.LogReader reader, PrintStream out,
+ long logUploadedTime) throws IOException {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
@@ -112,14 +114,20 @@ public class LogCLIHelpers implements Configurable {
return -1;
}
+ boolean foundContainerLogs = false;
while (true) {
try {
- LogReader.readAContainerLogsForALogType(valueStream, out);
+ LogReader.readAContainerLogsForALogType(valueStream, out,
+ logUploadedTime);
+ foundContainerLogs = true;
} catch (EOFException eof) {
break;
}
}
- return 0;
+ if (foundContainerLogs) {
+ return 0;
+ }
+ return -1;
}
@Private
@@ -157,13 +165,15 @@ public class LogCLIHelpers implements Configurable {
valueStream = reader.next(key);
while (valueStream != null) {
+
String containerString =
"\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
while (true) {
try {
- LogReader.readAContainerLogsForALogType(valueStream, out);
+ LogReader.readAContainerLogsForALogType(valueStream, out,
+ thisNodeFile.getModificationTime());
foundAnyLogs = true;
} catch (EOFException eof) {
break;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
index bba3258..3e9f7a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
@@ -126,6 +126,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
continue;
}
+ long logUploadedTime = thisNodeFile.getModificationTime();
reader =
new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
@@ -164,7 +165,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
}
foundLog = readContainerLogs(html, logReader, logLimits,
- desiredLogType);
+ desiredLogType, logUploadedTime);
} catch (IOException ex) {
LOG.error("Error getting logs for " + logEntity, ex);
continue;
@@ -189,7 +190,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
private boolean readContainerLogs(Block html,
AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
- String desiredLogType) throws IOException {
+ String desiredLogType, long logUpLoadTime) throws IOException {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
@@ -199,13 +200,12 @@ public class AggregatedLogsBlock extends HtmlBlock {
if (desiredLogType == null || desiredLogType.isEmpty()
|| desiredLogType.equals(logType)) {
long logLength = logReader.getCurrentLogLength();
- long logUpLoadTime = logReader.getCurrentLogUpLoadTime();
if (foundLog) {
html.pre()._("\n\n")._();
}
html.p()._("Log Type: " + logType)._();
- html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._();
+ html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._();
html.p()._("Log Length: " + Long.toString(logLength))._();
long start = logLimits.start < 0
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
index bc0485e..405cb3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.util.Times;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
@@ -178,9 +179,16 @@ public class TestAggregatedLogFormat {
logWriter.close();
}
- //Verify the output generated by readAContainerLogs(DataInputStream, Writer)
@Test
public void testReadAcontainerLogs1() throws Exception {
+ //Verify the output generated by readAContainerLogs(DataInputStream, Writer, logUploadedTime)
+ testReadAcontainerLog(true);
+
+ //Verify the output generated by readAContainerLogs(DataInputStream, Writer)
+ testReadAcontainerLog(false);
+ }
+
+ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception {
Configuration conf = new Configuration();
File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
Path remoteAppLogFile =
@@ -233,17 +241,23 @@ public class TestAggregatedLogFormat {
LogKey rLogKey = new LogKey();
DataInputStream dis = logReader.next(rLogKey);
Writer writer = new StringWriter();
- LogReader.readAcontainerLogs(dis, writer);
-
+
+ if (logUploadedTime) {
+ LogReader.readAcontainerLogs(dis, writer, System.currentTimeMillis());
+ } else {
+ LogReader.readAcontainerLogs(dis, writer);
+ }
+
// We should only do the log aggregation for stdout.
// Since we could not open the fileInputStream for stderr, this file is not
// aggregated.
String s = writer.toString();
int expectedLength =
- "\n\nLogType:stdout".length()
- + ("\nLogUploadTime:" + System.currentTimeMillis()).length()
+ "LogType:stdout".length()
+ + (logUploadedTime ? ("\nLog Upload Time:" + Times.format(System
+ .currentTimeMillis())).length() : 0)
+ ("\nLogLength:" + numChars).length()
- + "\nLog Contents:\n".length() + numChars;
+ + "\nLog Contents:\n".length() + numChars + "\n".length();
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs"));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58e9f24e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 419de88..7d911e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -767,30 +767,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
+
LogReader.readAContainerLogsForALogType(valueStream, ps);
String writtenLines[] = baos.toString().split(
System.getProperty("line.separator"));
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
- String fileType = writtenLines[0].substring(9);
-
- Assert.assertEquals("LogUploadTime:", writtenLines[1].substring(0, 14));
- String fileUploadedTimeStr = writtenLines[1].substring(15);
+ String fileType = writtenLines[0].substring(8);
- Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10));
- String fileLengthStr = writtenLines[2].substring(11);
+ Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
+ String fileLengthStr = writtenLines[1].substring(10);
long fileLength = Long.parseLong(fileLengthStr);
Assert.assertEquals("Log Contents:",
- writtenLines[3].substring(0, 13));
+ writtenLines[2].substring(0, 13));
String logContents = StringUtils.join(
- Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n");
+ Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n");
perContainerMap.put(fileType, logContents);
LOG.info("LogType:" + fileType);
- LOG.info("LogUploadTime:" + fileUploadedTimeStr);
LOG.info("LogLength:" + fileLength);
LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
} catch (EOFException eof) {