You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/08/13 20:52:24 UTC
hadoop git commit: YARN-7417. Remove duplicated code from
IndexedFileAggregatedLogsBlock and TFileAggregatedLogsBlock. Contributed by
Zian Chen
Repository: hadoop
Updated Branches:
refs/heads/trunk b4031a8f1 -> 74411ce0c
YARN-7417. Remove duplicated code from IndexedFileAggregatedLogsBlock
and TFileAggregatedLogsBlock.
Contributed by Zian Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74411ce0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74411ce0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74411ce0
Branch: refs/heads/trunk
Commit: 74411ce0ce7336c0f7bb5793939fdd64a5dcdef6
Parents: b4031a8
Author: Eric Yang <ey...@apache.org>
Authored: Mon Aug 13 16:50:00 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Aug 13 16:50:00 2018 -0400
----------------------------------------------------------------------
.../logaggregation/AggregatedLogFormat.java | 3 +-
.../filecontroller/LogAggregationHtmlBlock.java | 61 +++++++++
.../ifile/IndexedFileAggregatedLogsBlock.java | 136 +++++++------------
.../tfile/TFileAggregatedLogsBlock.java | 56 +-------
4 files changed, 117 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index d9b4c1e4..ca43fe6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -26,6 +26,7 @@ import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -1013,7 +1014,7 @@ public class AggregatedLogFormat {
}
@Private
- public static class ContainerLogsReader {
+ public static class ContainerLogsReader extends InputStream {
private DataInputStream valueStream;
private String currentLogType = null;
private long currentLogLength = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
index 784102b..4ec8794 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
@@ -24,6 +24,10 @@ import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -34,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
/**
@@ -143,6 +148,62 @@ public abstract class LogAggregationHtmlBlock extends HtmlBlock {
return true;
}
+ protected long[] checkParseRange(Block html, long startIndex,
+ long endIndex, long startTime, long endTime, long logLength, String logType) {
+ long start = startIndex < 0
+ ? logLength + startIndex : startIndex;
+ start = start < 0 ? 0 : start;
+ start = start > logLength ? logLength : start;
+ long end = endIndex < 0
+ ? logLength + endIndex : endIndex;
+ end = end < 0 ? 0 : end;
+ end = end > logLength ? logLength : end;
+ end = end < start ? start : end;
+
+ long toRead = end - start;
+ if (toRead < logLength) {
+ html.p().__("Showing " + toRead + " bytes of " + logLength
+ + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
+ $(ENTITY_STRING), $(APP_OWNER),
+ logType, "?start=0&start.time=" + startTime
+ + "&end.time=" + endTime), "here").
+ __(" for the full log.").__();
+ }
+ return new long[]{start, end};
+ }
+
+ protected void processContainerLog(Block html, long[] range, InputStream in,
+ int bufferSize, byte[] cbuf) throws IOException {
+ long totalSkipped = 0;
+ long start = range[0];
+ long toRead = range[1] - range[0];
+ while (totalSkipped < start) {
+ long ret = in.skip(start - totalSkipped);
+ if (ret == 0) {
+ //Read one byte
+ int nextByte = in.read();
+ // Check if we have reached EOF
+ if (nextByte == -1) {
+ throw new IOException("Premature EOF from container log");
+ }
+ ret = 1;
+ }
+ totalSkipped += ret;
+ }
+
+ int len = 0;
+ int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ Hamlet.PRE<Hamlet> pre = html.pre();
+
+ while (toRead > 0 && (len = in.read(cbuf, 0, currentToRead)) > 0) {
+ pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
+ toRead = toRead - len;
+ currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ }
+
+ pre.__();
+ }
+
protected static class BlockParameters {
private ApplicationId appId;
private ContainerId containerId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
index 4ef429d..eb9634b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
@@ -18,11 +18,7 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.io.IOException;
@@ -53,7 +49,6 @@ import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregation
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
/**
* The Aggregated Logs Block implementation for Indexed File.
@@ -179,88 +174,8 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
continue;
}
- Algorithm compressName = Compression.getCompressionAlgorithmByName(
- compressAlgo);
- Decompressor decompressor = compressName.getDecompressor();
- FileContext fileContext = FileContext.getFileContext(
- thisNodeFile.getPath().toUri(), conf);
- FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
- int bufferSize = 65536;
- for (IndexedFileLogMeta candidate : candidates) {
- if (candidate.getLastModifiedTime() < startTime
- || candidate.getLastModifiedTime() > endTime) {
- continue;
- }
- byte[] cbuf = new byte[bufferSize];
- InputStream in = null;
- try {
- in = compressName.createDecompressionStream(
- new BoundedRangeFileInputStream(fsin,
- candidate.getStartIndex(),
- candidate.getFileCompressedSize()),
- decompressor,
- LogAggregationIndexedFileController.getFSInputBufferSize(
- conf));
- long logLength = candidate.getFileSize();
- html.pre().__("\n\n").__();
- html.p().__("Log Type: " + candidate.getFileName()).__();
- html.p().__("Log Upload Time: " + Times.format(
- candidate.getLastModifiedTime())).__();
- html.p().__("Log Length: " + Long.toString(
- logLength)).__();
- long startIndex = start < 0
- ? logLength + start : start;
- startIndex = startIndex < 0 ? 0 : startIndex;
- startIndex = startIndex > logLength ? logLength : startIndex;
- long endLogIndex = end < 0
- ? logLength + end : end;
- endLogIndex = endLogIndex < 0 ? 0 : endLogIndex;
- endLogIndex = endLogIndex > logLength ? logLength : endLogIndex;
- endLogIndex = endLogIndex < startIndex ?
- startIndex : endLogIndex;
- long toRead = endLogIndex - startIndex;
- if (toRead < logLength) {
- html.p().__("Showing " + toRead + " bytes of " + logLength
- + " total. Click ").a(url("logs", $(NM_NODENAME),
- $(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
- candidate.getFileName(), "?start=0&start.time="
- + startTime + "&end.time=" + endTime), "here").
- __(" for the full log.").__();
- }
- long totalSkipped = 0;
- while (totalSkipped < startIndex) {
- long ret = in.skip(startIndex - totalSkipped);
- if (ret == 0) {
- //Read one byte
- int nextByte = in.read();
- // Check if we have reached EOF
- if (nextByte == -1) {
- throw new IOException("Premature EOF from container log");
- }
- ret = 1;
- }
- totalSkipped += ret;
- }
- int len = 0;
- int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
- PRE<Hamlet> pre = html.pre();
-
- while (toRead > 0
- && (len = in.read(cbuf, 0, currentToRead)) > 0) {
- pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
- toRead = toRead - len;
- currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
- }
-
- pre.__();
- foundLog = true;
- } catch (Exception ex) {
- LOG.error("Error getting logs for " + logEntity, ex);
- continue;
- } finally {
- IOUtils.closeQuietly(in);
- }
- }
+ foundLog = readContainerLog(compressAlgo, html, thisNodeFile, start,
+ end, candidates, startTime, endTime, foundLog, logEntity);
}
if (!foundLog) {
if (desiredLogType.isEmpty()) {
@@ -277,4 +192,51 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
LOG.error("Error getting logs for " + logEntity, ex);
}
}
+
+ private boolean readContainerLog(String compressAlgo, Block html,
+ FileStatus thisNodeFile, long start, long end,
+ List<IndexedFileLogMeta> candidates, long startTime, long endTime,
+ boolean foundLog, String logEntity) throws IOException {
+ Algorithm compressName = Compression.getCompressionAlgorithmByName(
+ compressAlgo);
+ Decompressor decompressor = compressName.getDecompressor();
+ FileContext fileContext = FileContext.getFileContext(
+ thisNodeFile.getPath().toUri(), conf);
+ FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
+ int bufferSize = 65536;
+ for (IndexedFileLogMeta candidate : candidates) {
+ if (candidate.getLastModifiedTime() < startTime
+ || candidate.getLastModifiedTime() > endTime) {
+ continue;
+ }
+ byte[] cbuf = new byte[bufferSize];
+ InputStream in = null;
+ try {
+ in = compressName.createDecompressionStream(
+ new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(),
+ candidate.getFileCompressedSize()), decompressor,
+ LogAggregationIndexedFileController.getFSInputBufferSize(conf));
+ long logLength = candidate.getFileSize();
+ html.pre().__("\n\n").__();
+ html.p().__("Log Type: " + candidate.getFileName()).__();
+ html.p().__(
+ "Log Upload Time: " + Times.format(candidate.getLastModifiedTime()))
+ .__();
+ html.p().__("Log Length: " + Long.toString(logLength)).__();
+
+ long[] range = checkParseRange(html, start, end, startTime, endTime,
+ logLength, candidate.getFileName());
+ processContainerLog(html, range, in, bufferSize, cbuf);
+
+ foundLog = true;
+ } catch (Exception ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ continue;
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ }
+ return foundLog;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
index 64b6219..6fb5b90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
@@ -18,14 +18,11 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -173,7 +170,7 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
long endIndex, String desiredLogType, long logUpLoadTime,
long startTime, long endTime) throws IOException {
int bufferSize = 65536;
- char[] cbuf = new char[bufferSize];
+ byte[] cbuf = new byte[bufferSize];
boolean foundLog = false;
String logType = logReader.nextLog();
@@ -189,53 +186,10 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__();
html.p().__("Log Length: " + Long.toString(logLength)).__();
- long start = startIndex < 0
- ? logLength + startIndex : startIndex;
- start = start < 0 ? 0 : start;
- start = start > logLength ? logLength : start;
- long end = endIndex < 0
- ? logLength + endIndex : endIndex;
- end = end < 0 ? 0 : end;
- end = end > logLength ? logLength : end;
- end = end < start ? start : end;
-
- long toRead = end - start;
- if (toRead < logLength) {
- html.p().__("Showing " + toRead + " bytes of " + logLength
- + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
- $(ENTITY_STRING), $(APP_OWNER),
- logType, "?start=0&start.time=" + startTime
- + "&end.time=" + endTime), "here").
- __(" for the full log.").__();
- }
-
- long totalSkipped = 0;
- while (totalSkipped < start) {
- long ret = logReader.skip(start - totalSkipped);
- if (ret == 0) {
- //Read one byte
- int nextByte = logReader.read();
- // Check if we have reached EOF
- if (nextByte == -1) {
- throw new IOException("Premature EOF from container log");
- }
- ret = 1;
- }
- totalSkipped += ret;
- }
-
- int len = 0;
- int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
- PRE<Hamlet> pre = html.pre();
-
- while (toRead > 0
- && (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
- pre.__(new String(cbuf, 0, len));
- toRead = toRead - len;
- currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
- }
+ long[] range = checkParseRange(html, startIndex, endIndex, startTime,
+ endTime, logLength, logType);
- pre.__();
+ processContainerLog(html, range, logReader, bufferSize, cbuf);
foundLog = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org