You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/08/13 20:52:24 UTC

hadoop git commit: YARN-7417. Remove duplicated code from IndexedFileAggregatedLogsBlock and TFileAggregatedLogsBlock. Contributed by Zian Chen

Repository: hadoop
Updated Branches:
  refs/heads/trunk b4031a8f1 -> 74411ce0c


YARN-7417. Remove duplicated code from IndexedFileAggregatedLogsBlock
           and TFileAggregatedLogsBlock.
           Contributed by Zian Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74411ce0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74411ce0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74411ce0

Branch: refs/heads/trunk
Commit: 74411ce0ce7336c0f7bb5793939fdd64a5dcdef6
Parents: b4031a8
Author: Eric Yang <ey...@apache.org>
Authored: Mon Aug 13 16:50:00 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Aug 13 16:50:00 2018 -0400

----------------------------------------------------------------------
 .../logaggregation/AggregatedLogFormat.java     |   3 +-
 .../filecontroller/LogAggregationHtmlBlock.java |  61 +++++++++
 .../ifile/IndexedFileAggregatedLogsBlock.java   | 136 +++++++------------
 .../tfile/TFileAggregatedLogsBlock.java         |  56 +-------
 4 files changed, 117 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index d9b4c1e4..ca43fe6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -26,6 +26,7 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -1013,7 +1014,7 @@ public class AggregatedLogFormat {
   }
 
   @Private
-  public static class ContainerLogsReader {
+  public static class ContainerLogsReader extends InputStream {
     private DataInputStream valueStream;
     private String currentLogType = null;
     private long currentLogLength = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
index 784102b..4ec8794 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
@@ -24,6 +24,10 @@ import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 
 import com.google.inject.Inject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
 import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 
 /**
@@ -143,6 +148,62 @@ public abstract class LogAggregationHtmlBlock extends HtmlBlock {
     return true;
   }
 
+  protected long[] checkParseRange(Block html, long startIndex,
+      long endIndex, long startTime, long endTime, long logLength, String logType) {
+    long start = startIndex < 0
+        ? logLength + startIndex : startIndex;
+    start = start < 0 ? 0 : start;
+    start = start > logLength ? logLength : start;
+    long end = endIndex < 0
+        ? logLength + endIndex : endIndex;
+    end = end < 0 ? 0 : end;
+    end = end > logLength ? logLength : end;
+    end = end < start ? start : end;
+
+    long toRead = end - start;
+    if (toRead < logLength) {
+      html.p().__("Showing " + toRead + " bytes of " + logLength
+          + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
+          $(ENTITY_STRING), $(APP_OWNER),
+          logType, "?start=0&start.time=" + startTime
+              + "&end.time=" + endTime), "here").
+          __(" for the full log.").__();
+    }
+    return new long[]{start, end};
+  }
+
+  protected void processContainerLog(Block html, long[] range, InputStream in,
+      int bufferSize, byte[] cbuf) throws IOException {
+    long totalSkipped = 0;
+    long start = range[0];
+    long toRead = range[1] - range[0];
+    while (totalSkipped < start) {
+      long ret = in.skip(start - totalSkipped);
+      if (ret == 0) {
+        //Read one byte
+        int nextByte = in.read();
+        // Check if we have reached EOF
+        if (nextByte == -1) {
+          throw new IOException("Premature EOF from container log");
+        }
+        ret = 1;
+      }
+      totalSkipped += ret;
+    }
+
+    int len = 0;
+    int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+    Hamlet.PRE<Hamlet> pre = html.pre();
+
+    while (toRead > 0 && (len = in.read(cbuf, 0, currentToRead)) > 0) {
+      pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
+      toRead = toRead - len;
+      currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+    }
+
+    pre.__();
+  }
+
   protected static class BlockParameters {
     private ApplicationId appId;
     private ContainerId containerId;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
index 4ef429d..eb9634b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
@@ -18,11 +18,7 @@
 
 package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
 
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 
 import com.google.inject.Inject;
 import java.io.IOException;
@@ -53,7 +49,6 @@ import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregation
 import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
 
 /**
  * The Aggregated Logs Block implementation for Indexed File.
@@ -179,88 +174,8 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
           continue;
         }
 
-        Algorithm compressName = Compression.getCompressionAlgorithmByName(
-            compressAlgo);
-        Decompressor decompressor = compressName.getDecompressor();
-        FileContext fileContext = FileContext.getFileContext(
-            thisNodeFile.getPath().toUri(), conf);
-        FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
-        int bufferSize = 65536;
-        for (IndexedFileLogMeta candidate : candidates) {
-          if (candidate.getLastModifiedTime() < startTime
-              || candidate.getLastModifiedTime() > endTime) {
-            continue;
-          }
-          byte[] cbuf = new byte[bufferSize];
-          InputStream in = null;
-          try {
-            in = compressName.createDecompressionStream(
-                new BoundedRangeFileInputStream(fsin,
-                    candidate.getStartIndex(),
-                    candidate.getFileCompressedSize()),
-                    decompressor,
-                    LogAggregationIndexedFileController.getFSInputBufferSize(
-                        conf));
-            long logLength = candidate.getFileSize();
-            html.pre().__("\n\n").__();
-            html.p().__("Log Type: " + candidate.getFileName()).__();
-            html.p().__("Log Upload Time: " + Times.format(
-                candidate.getLastModifiedTime())).__();
-            html.p().__("Log Length: " + Long.toString(
-                logLength)).__();
-            long startIndex = start < 0
-                ? logLength + start : start;
-            startIndex = startIndex < 0 ? 0 : startIndex;
-            startIndex = startIndex > logLength ? logLength : startIndex;
-            long endLogIndex = end < 0
-                ? logLength + end : end;
-            endLogIndex = endLogIndex < 0 ? 0 : endLogIndex;
-            endLogIndex = endLogIndex > logLength ? logLength : endLogIndex;
-            endLogIndex = endLogIndex < startIndex ?
-                startIndex : endLogIndex;
-            long toRead = endLogIndex - startIndex;
-            if (toRead < logLength) {
-              html.p().__("Showing " + toRead + " bytes of " + logLength
-                  + " total. Click ").a(url("logs", $(NM_NODENAME),
-                      $(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
-                      candidate.getFileName(), "?start=0&start.time="
-                      + startTime + "&end.time=" + endTime), "here").
-                      __(" for the full log.").__();
-            }
-            long totalSkipped = 0;
-            while (totalSkipped < startIndex) {
-              long ret = in.skip(startIndex - totalSkipped);
-              if (ret == 0) {
-                //Read one byte
-                int nextByte = in.read();
-                // Check if we have reached EOF
-                if (nextByte == -1) {
-                  throw new IOException("Premature EOF from container log");
-                }
-                ret = 1;
-              }
-              totalSkipped += ret;
-            }
-            int len = 0;
-            int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
-            PRE<Hamlet> pre = html.pre();
-
-            while (toRead > 0
-                && (len = in.read(cbuf, 0, currentToRead)) > 0) {
-              pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
-              toRead = toRead - len;
-              currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
-            }
-
-            pre.__();
-            foundLog = true;
-          } catch (Exception ex) {
-            LOG.error("Error getting logs for " + logEntity, ex);
-            continue;
-          } finally {
-            IOUtils.closeQuietly(in);
-          }
-        }
+        foundLog = readContainerLog(compressAlgo, html, thisNodeFile, start,
+            end, candidates, startTime, endTime, foundLog, logEntity);
       }
       if (!foundLog) {
         if (desiredLogType.isEmpty()) {
@@ -277,4 +192,51 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
       LOG.error("Error getting logs for " + logEntity, ex);
     }
   }
+
+  private boolean readContainerLog(String compressAlgo, Block html,
+      FileStatus thisNodeFile, long start, long end,
+      List<IndexedFileLogMeta> candidates, long startTime, long endTime,
+      boolean foundLog, String logEntity) throws IOException {
+    Algorithm compressName = Compression.getCompressionAlgorithmByName(
+        compressAlgo);
+    Decompressor decompressor = compressName.getDecompressor();
+    FileContext fileContext = FileContext.getFileContext(
+        thisNodeFile.getPath().toUri(), conf);
+    FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
+    int bufferSize = 65536;
+    for (IndexedFileLogMeta candidate : candidates) {
+      if (candidate.getLastModifiedTime() < startTime
+          || candidate.getLastModifiedTime() > endTime) {
+        continue;
+      }
+      byte[] cbuf = new byte[bufferSize];
+      InputStream in = null;
+      try {
+        in = compressName.createDecompressionStream(
+            new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(),
+                candidate.getFileCompressedSize()), decompressor,
+            LogAggregationIndexedFileController.getFSInputBufferSize(conf));
+        long logLength = candidate.getFileSize();
+        html.pre().__("\n\n").__();
+        html.p().__("Log Type: " + candidate.getFileName()).__();
+        html.p().__(
+            "Log Upload Time: " + Times.format(candidate.getLastModifiedTime()))
+            .__();
+        html.p().__("Log Length: " + Long.toString(logLength)).__();
+
+        long[] range = checkParseRange(html, start, end, startTime, endTime,
+            logLength, candidate.getFileName());
+        processContainerLog(html, range, in, bufferSize, cbuf);
+
+        foundLog = true;
+      } catch (Exception ex) {
+        LOG.error("Error getting logs for " + logEntity, ex);
+        continue;
+      } finally {
+        IOUtils.closeQuietly(in);
+      }
+    }
+    return foundLog;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74411ce0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
index 64b6219..6fb5b90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
@@ -18,14 +18,11 @@
 
 package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
 
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 
 import com.google.inject.Inject;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -173,7 +170,7 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
       long endIndex, String desiredLogType, long logUpLoadTime,
       long startTime, long endTime) throws IOException {
     int bufferSize = 65536;
-    char[] cbuf = new char[bufferSize];
+    byte[] cbuf = new byte[bufferSize];
 
     boolean foundLog = false;
     String logType = logReader.nextLog();
@@ -189,53 +186,10 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
         html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__();
         html.p().__("Log Length: " + Long.toString(logLength)).__();
 
-        long start = startIndex < 0
-            ? logLength + startIndex : startIndex;
-        start = start < 0 ? 0 : start;
-        start = start > logLength ? logLength : start;
-        long end = endIndex < 0
-            ? logLength + endIndex : endIndex;
-        end = end < 0 ? 0 : end;
-        end = end > logLength ? logLength : end;
-        end = end < start ? start : end;
-
-        long toRead = end - start;
-        if (toRead < logLength) {
-          html.p().__("Showing " + toRead + " bytes of " + logLength
-            + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
-                $(ENTITY_STRING), $(APP_OWNER),
-                logType, "?start=0&start.time=" + startTime
-                + "&end.time=" + endTime), "here").
-            __(" for the full log.").__();
-        }
-
-        long totalSkipped = 0;
-        while (totalSkipped < start) {
-          long ret = logReader.skip(start - totalSkipped);
-          if (ret == 0) {
-            //Read one byte
-            int nextByte = logReader.read();
-            // Check if we have reached EOF
-            if (nextByte == -1) {
-              throw new IOException("Premature EOF from container log");
-            }
-            ret = 1;
-          }
-          totalSkipped += ret;
-        }
-
-        int len = 0;
-        int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
-        PRE<Hamlet> pre = html.pre();
-
-        while (toRead > 0
-            && (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
-          pre.__(new String(cbuf, 0, len));
-          toRead = toRead - len;
-          currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
-        }
+        long[] range = checkParseRange(html, startIndex, endIndex, startTime,
+            endTime, logLength, logType);
 
-        pre.__();
+        processContainerLog(html, range, logReader, bufferSize, cbuf);
         foundLog = true;
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org