You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/09/29 23:52:52 UTC
[3/4] hadoop git commit: YARN-7072. Add a new log aggregation file
format controller. Contributed by Xuan Gong.
YARN-7072. Add a new log aggregation file format controller. Contributed by Xuan Gong.
(cherry picked from commit 3fddabc2fe4fbdb8ef3f9ce7558955c4f0794dcc)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3eade44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3eade44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3eade44
Branch: refs/heads/branch-3.0
Commit: c3eade44d057f3a2d625a4e1ddda11dd3afc7145
Parents: cdaab89
Author: Junping Du <ju...@apache.org>
Authored: Fri Sep 8 15:16:19 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Sep 29 16:49:44 2017 -0700
----------------------------------------------------------------------
.../file/tfile/BoundedRangeFileInputStream.java | 2 +-
.../hadoop/io/file/tfile/Compression.java | 6 +-
.../file/tfile/SimpleBufferedOutputStream.java | 2 +-
.../apache/hadoop/yarn/client/cli/LogsCLI.java | 25 +-
.../logaggregation/ContainerLogFileInfo.java | 93 ++
.../yarn/logaggregation/ContainerLogMeta.java | 8 +-
.../logaggregation/LogAggregationUtils.java | 27 +
.../yarn/logaggregation/LogCLIHelpers.java | 20 +-
.../yarn/logaggregation/LogToolUtils.java | 26 +
.../logaggregation/PerContainerLogFileInfo.java | 93 --
.../LogAggregationFileController.java | 45 +-
.../ifile/IndexedFileAggregatedLogsBlock.java | 275 +++++
.../LogAggregationIndexedFileController.java | 1056 ++++++++++++++++++
.../filecontroller/ifile/package-info.java | 21 +
.../tfile/LogAggregationTFileController.java | 10 +-
.../TestLogAggregationIndexFileController.java | 316 ++++++
.../webapp/TestAHSWebServices.java | 8 +-
.../server/webapp/dao/ContainerLogsInfo.java | 10 +-
.../webapp/dao/NMContainerLogsInfo.java | 8 +-
.../nodemanager/webapp/TestNMWebServices.java | 8 +-
20 files changed, 1886 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
index e7f4c83..050c15b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
* BoundedRangeFileInputStream on top of the same FSDataInputStream and they
* would not interfere with each other.
*/
-class BoundedRangeFileInputStream extends InputStream {
+public class BoundedRangeFileInputStream extends InputStream {
private FSDataInputStream in;
private long pos;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
index f82f4df..fa85ed7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
@@ -43,7 +43,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
/**
* Compression related stuff.
*/
-final class Compression {
+public final class Compression {
static final Logger LOG = LoggerFactory.getLogger(Compression.class);
/**
@@ -75,7 +75,7 @@ final class Compression {
/**
* Compression algorithms.
*/
- enum Algorithm {
+ public enum Algorithm {
LZO(TFile.COMPRESSION_LZO) {
private transient boolean checked = false;
private static final String defaultClazz =
@@ -348,7 +348,7 @@ final class Compression {
}
}
- static Algorithm getCompressionAlgorithmByName(String compressName) {
+ public static Algorithm getCompressionAlgorithmByName(String compressName) {
Algorithm[] algos = Algorithm.class.getEnumConstants();
for (Algorithm a : algos) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
index a26a02d..0a194a3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
@@ -25,7 +25,7 @@ import java.io.OutputStream;
* A simplified BufferedOutputStream with borrowed buffer, and allow users to
* see how much data have been buffered.
*/
-class SimpleBufferedOutputStream extends FilterOutputStream {
+public class SimpleBufferedOutputStream extends FilterOutputStream {
protected byte buf[]; // the borrowed buffer
protected int count = 0; // bytes used in buffer.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
index 6e62213..9a8ba4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
@@ -65,9 +65,10 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
-import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray;
@@ -442,10 +443,10 @@ public class LogsCLI extends Configured implements Tool {
return false;
}
- private List<Pair<PerContainerLogFileInfo, String>> getContainerLogFiles(
+ private List<Pair<ContainerLogFileInfo, String>> getContainerLogFiles(
Configuration conf, String containerIdStr, String nodeHttpAddress)
throws IOException {
- List<Pair<PerContainerLogFileInfo, String>> logFileInfos
+ List<Pair<ContainerLogFileInfo, String>> logFileInfos
= new ArrayList<>();
try {
WebResource webResource = webServiceClient
@@ -483,12 +484,12 @@ public class LogsCLI extends Configured implements Tool {
if (ob instanceof JSONArray) {
JSONArray obArray = (JSONArray)ob;
for (int j = 0; j < obArray.length(); j++) {
- logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
+ logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
obArray.getJSONObject(j)), aggregateType));
}
} else if (ob instanceof JSONObject) {
- logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
+ logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
(JSONObject)ob), aggregateType));
}
@@ -507,7 +508,7 @@ public class LogsCLI extends Configured implements Tool {
return logFileInfos;
}
- private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
+ private ContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
JSONObject meta) throws JSONException {
String fileName = meta.has("fileName") ?
meta.getString("fileName") : "N/A";
@@ -515,7 +516,7 @@ public class LogsCLI extends Configured implements Tool {
meta.getString("fileSize") : "N/A";
String lastModificationTime = meta.has("lastModifiedTime") ?
meta.getString("lastModifiedTime") : "N/A";
- return new PerContainerLogFileInfo(fileName, fileSize,
+ return new ContainerLogFileInfo(fileName, fileSize,
lastModificationTime);
}
@@ -535,7 +536,7 @@ public class LogsCLI extends Configured implements Tool {
return -1;
}
String nodeId = request.getNodeId();
- PrintStream out = logCliHelper.createPrintStream(localDir, nodeId,
+ PrintStream out = LogToolUtils.createPrintStream(localDir, nodeId,
containerIdStr);
try {
Set<String> matchedFiles = getMatchedContainerLogFiles(request,
@@ -1284,9 +1285,9 @@ public class LogsCLI extends Configured implements Tool {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
"LogFile", "LogLength", "LastModificationTime", "LogAggregationType");
outStream.println(StringUtils.repeat("=", containerString.length() * 2));
- List<Pair<PerContainerLogFileInfo, String>> infos = getContainerLogFiles(
+ List<Pair<ContainerLogFileInfo, String>> infos = getContainerLogFiles(
getConf(), containerId, nodeHttpAddress);
- for (Pair<PerContainerLogFileInfo, String> info : infos) {
+ for (Pair<ContainerLogFileInfo, String> info : infos) {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
info.getKey().getFileName(), info.getKey().getFileSize(),
info.getKey().getLastModifiedTime(), info.getValue());
@@ -1298,11 +1299,11 @@ public class LogsCLI extends Configured implements Tool {
boolean useRegex) throws IOException {
// fetch all the log files for the container
// filter the log files based on the given -log_files pattern
- List<Pair<PerContainerLogFileInfo, String>> allLogFileInfos=
+ List<Pair<ContainerLogFileInfo, String>> allLogFileInfos=
getContainerLogFiles(getConf(), request.getContainerId(),
request.getNodeHttpAddress());
List<String> fileNames = new ArrayList<String>();
- for (Pair<PerContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
+ for (Pair<ContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getKey().getFileName());
}
return getMatchedLogFiles(request, fileNames,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java
new file mode 100644
index 0000000..b461ebb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+/**
+ * ContainerLogFileInfo represents the meta data for a container log file,
+ * which includes:
+ * <ul>
+ * <li>The filename of the container log.</li>
+ * <li>The size of the container log.</li>
+ * <li>The last modification time of the container log.</li>
+ * </ul>
+ *
+ */
+public class ContainerLogFileInfo {
+ private String fileName;
+ private String fileSize;
+ private String lastModifiedTime;
+
+ //JAXB needs this
+ public ContainerLogFileInfo() {}
+
+ public ContainerLogFileInfo(String fileName, String fileSize,
+ String lastModifiedTime) {
+ this.setFileName(fileName);
+ this.setFileSize(fileSize);
+ this.setLastModifiedTime(lastModifiedTime);
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public String getFileSize() {
+ return fileSize;
+ }
+
+ public void setFileSize(String fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public String getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ public void setLastModifiedTime(String lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
+ result = prime * result + ((fileSize == null) ? 0 : fileSize.hashCode());
+ result = prime * result + ((lastModifiedTime == null) ?
+ 0 : lastModifiedTime.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object otherObj) {
+ if (otherObj == this) {
+ return true;
+ }
+ if (!(otherObj instanceof ContainerLogFileInfo)) {
+ return false;
+ }
+ ContainerLogFileInfo other = (ContainerLogFileInfo)otherObj;
+ return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
+ && other.lastModifiedTime.equals(lastModifiedTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
index 26a620e..4c6b0de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
@@ -26,14 +26,14 @@ import java.util.List;
* <ul>
* <li>The Container Id.</li>
* <li>The NodeManager Id.</li>
- * <li>A list of {@link PerContainerLogFileInfo}.</li>
+ * <li>A list of {@link ContainerLogFileInfo}.</li>
* </ul>
*
*/
public class ContainerLogMeta {
private String containerId;
private String nodeId;
- private List<PerContainerLogFileInfo> logMeta;
+ private List<ContainerLogFileInfo> logMeta;
public ContainerLogMeta(String containerId, String nodeId) {
this.containerId = containerId;
@@ -51,11 +51,11 @@ public class ContainerLogMeta {
public void addLogMeta(String fileName, String fileSize,
String lastModificationTime) {
- logMeta.add(new PerContainerLogFileInfo(fileName, fileSize,
+ logMeta.add(new ContainerLogFileInfo(fileName, fileSize,
lastModificationTime));
}
- public List<PerContainerLogFileInfo> getContainerLogMeta() {
+ public List<ContainerLogFileInfo> getContainerLogMeta() {
return this.logMeta;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 6d04c29..edf2cf3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
@Private
public class LogAggregationUtils {
@@ -200,6 +203,30 @@ public class LogAggregationUtils {
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
+ * @param remoteRootLogDir the remote root log directory
+ * @param suffix the log directory suffix
+ * @return the list of available log files
+ * @throws IOException if there is no log file available
+ */
+ public static List<FileStatus> getRemoteNodeFileList(
+ Configuration conf, ApplicationId appId, String appOwner,
+ org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+ throws IOException {
+ Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
+ remoteRootLogDir, suffix);
+ List<FileStatus> nodeFiles = new ArrayList<>();
+ Path qualifiedLogDir =
+ FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
+ nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
+ qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
+ return nodeFiles;
+ }
+
+ /**
+ * Get all available log files under remote app log directory.
+ * @param conf the configuration
+ * @param appId the applicationId
+ * @param appOwner the application owner
* @return the iterator of available log files
* @throws IOException if there is no log file available
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 0068eae..97b78ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -22,8 +22,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -229,7 +227,7 @@ public class LogCLIHelpers implements Configurable {
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
"LastModificationTime", "LogAggregationType");
out.println(StringUtils.repeat("=", containerString.length() * 2));
- for (PerContainerLogFileInfo logMeta : containerLogMeta
+ for (ContainerLogFileInfo logMeta : containerLogMeta
.getContainerLogMeta()) {
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
logMeta.getFileSize(), logMeta.getLastModifiedTime(), "AGGREGATED");
@@ -345,20 +343,6 @@ public class LogCLIHelpers implements Configurable {
+ ". Error message found: " + errorMessage);
}
- @Private
- public PrintStream createPrintStream(String localDir, String nodeId,
- String containerId) throws IOException {
- PrintStream out = System.out;
- if(localDir != null && !localDir.isEmpty()) {
- Path nodePath = new Path(localDir, LogAggregationUtils
- .getNodeString(nodeId));
- Files.createDirectories(Paths.get(nodePath.toString()));
- Path containerLogPath = new Path(nodePath, containerId);
- out = new PrintStream(containerLogPath.toString(), "UTF-8");
- }
- return out;
- }
-
public void closePrintStream(PrintStream out) {
if (out != System.out) {
IOUtils.closeQuietly(out);
@@ -379,7 +363,7 @@ public class LogCLIHelpers implements Configurable {
return logTypes;
}
for (ContainerLogMeta logMeta: containersLogMeta) {
- for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
+ for (ContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
logTypes.add(fileInfo.getFileName());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
index ddee445..90faa19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
@@ -21,11 +21,15 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.PrintStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
/**
* This class contains several utility function which could be used in different
@@ -158,4 +162,26 @@ public final class LogToolUtils {
}
}
+
+ /**
+ * Create the container log file under given (local directory/nodeId) and
+ * return the PrintStream object.
+ * @param localDir the Local Dir
+ * @param nodeId the NodeId
+ * @param containerId the ContainerId
+ * @return the printStream object
+ * @throws IOException if an I/O error occurs
+ */
+ public static PrintStream createPrintStream(String localDir, String nodeId,
+ String containerId) throws IOException {
+ PrintStream out = System.out;
+ if(localDir != null && !localDir.isEmpty()) {
+ Path nodePath = new Path(localDir, LogAggregationUtils
+ .getNodeString(nodeId));
+ Files.createDirectories(Paths.get(nodePath.toString()));
+ Path containerLogPath = new Path(nodePath, containerId);
+ out = new PrintStream(containerLogPath.toString(), "UTF-8");
+ }
+ return out;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java
deleted file mode 100644
index 867815f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.logaggregation;
-
-/**
- * PerContainerLogFileInfo represents the meta data for a container log file,
- * which includes:
- * <ul>
- * <li>The filename of the container log.</li>
- * <li>The size of the container log.</li>
- * <li>The last modification time of the container log.</li>
- * </ul>
- *
- */
-public class PerContainerLogFileInfo {
- private String fileName;
- private String fileSize;
- private String lastModifiedTime;
-
- //JAXB needs this
- public PerContainerLogFileInfo() {}
-
- public PerContainerLogFileInfo(String fileName, String fileSize,
- String lastModifiedTime) {
- this.setFileName(fileName);
- this.setFileSize(fileSize);
- this.setLastModifiedTime(lastModifiedTime);
- }
-
- public String getFileName() {
- return fileName;
- }
-
- public void setFileName(String fileName) {
- this.fileName = fileName;
- }
-
- public String getFileSize() {
- return fileSize;
- }
-
- public void setFileSize(String fileSize) {
- this.fileSize = fileSize;
- }
-
- public String getLastModifiedTime() {
- return lastModifiedTime;
- }
-
- public void setLastModifiedTime(String lastModifiedTime) {
- this.lastModifiedTime = lastModifiedTime;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
- result = prime * result + ((fileSize == null) ? 0 : fileSize.hashCode());
- result = prime * result + ((lastModifiedTime == null) ?
- 0 : lastModifiedTime.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object otherObj) {
- if (otherObj == this) {
- return true;
- }
- if (!(otherObj instanceof PerContainerLogFileInfo)) {
- return false;
- }
- PerContainerLogFileInfo other = (PerContainerLogFileInfo)otherObj;
- return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
- && other.lastModifiedTime.equals(lastModifiedTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 39f3dc3..5df900b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -25,9 +25,6 @@ import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.PrintStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -91,6 +89,12 @@ public abstract class LogAggregationFileController {
protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
+ /**
+ * Umask for the log file.
+ */
+ protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission
+ .createImmutable((short) (0640 ^ 0777));
+
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
@@ -98,6 +102,11 @@ public abstract class LogAggregationFileController {
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
+ // This is temporary solution. The configuration will be deleted once we have
+ // the FileSystem API to check whether append operation is supported or not.
+ public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND
+ = YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append";
+
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
@@ -178,19 +187,6 @@ public abstract class LogAggregationFileController {
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
- protected PrintStream createPrintStream(String localDir, String nodeId,
- String containerId) throws IOException {
- PrintStream out = System.out;
- if(localDir != null && !localDir.isEmpty()) {
- Path nodePath = new Path(localDir, LogAggregationUtils
- .getNodeString(nodeId));
- Files.createDirectories(Paths.get(nodePath.toString()));
- Path containerLogPath = new Path(nodePath, containerId);
- out = new PrintStream(containerLogPath.toString(), "UTF-8");
- }
- return out;
- }
-
protected void closePrintStream(OutputStream out) {
if (out != System.out) {
IOUtils.cleanupWithLogger(LOG, out);
@@ -481,4 +477,21 @@ public abstract class LogAggregationFileController {
LOG.error("Failed to clean old logs", e);
}
}
+
+ /**
+ * Create the aggregated log suffix. The LogAggregationFileController
+ * should call this to get the suffix and append the suffix to the end
+ * of each log. This would keep the aggregated log format consistent.
+ *
+ * @param fileName the File Name
+ * @return the aggregated log suffix String
+ */
+ protected String aggregatedLogSuffix(String fileName) {
+ StringBuilder sb = new StringBuilder();
+ String endOfFile = "End of LogType:" + fileName;
+ sb.append("\n" + endOfFile + "\n");
+ sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ + "\n\n");
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
new file mode 100644
index 0000000..c4cbfda
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
+
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
+
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
+import org.apache.hadoop.io.file.tfile.Compression;
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedFileLogMeta;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedLogsMeta;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
+
+/**
+ * The Aggregated Logs Block implementation for Indexed File.
+ */
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
+
+ private final LogAggregationIndexedFileController fileController;
+ private final Configuration conf;
+
+ @Inject
+ public IndexedFileAggregatedLogsBlock(ViewContext ctx,
+ Configuration conf,
+ LogAggregationIndexedFileController fileController) {
+ super(ctx);
+ this.conf = conf;
+ this.fileController = fileController;
+ }
+
+ @Override
+ protected void render(Block html) {
+ BlockParameters params = verifyAndParseParameters(html);
+ if (params == null) {
+ return;
+ }
+
+ ApplicationId appId = params.getAppId();
+ ContainerId containerId = params.getContainerId();
+ NodeId nodeId = params.getNodeId();
+ String appOwner = params.getAppOwner();
+ String logEntity = params.getLogEntity();
+ long start = params.getStartIndex();
+ long end = params.getEndIndex();
+
+ List<FileStatus> nodeFiles = null;
+ try {
+ nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, appOwner,
+ this.fileController.getRemoteRootLogDir(),
+ this.fileController.getRemoteRootLogDirSuffix());
+ } catch(Exception ex) {
+ html.h1("Unable to locate any logs for container "
+ + containerId.toString());
+ LOG.error(ex.getMessage());
+ return;
+ }
+
+ Map<String, FileStatus> checkSumFiles;
+ try {
+ checkSumFiles = fileController.filterFiles(nodeFiles,
+ LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
+ } catch (IOException ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ html.h1("Error getting logs for " + logEntity);
+ return;
+ }
+
+ List<FileStatus> fileToRead;
+ try {
+ fileToRead = fileController.getNodeLogFileToRead(nodeFiles,
+ nodeId.toString(), appId);
+ } catch (IOException ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ html.h1("Error getting logs for " + logEntity);
+ return;
+ }
+
+ boolean foundLog = false;
+ String desiredLogType = $(CONTAINER_LOG_TYPE);
+ try {
+ for (FileStatus thisNodeFile : fileToRead) {
+ FileStatus checkSum = fileController.getAllChecksumFiles(
+ checkSumFiles, thisNodeFile.getPath().getName());
+ long endIndex = -1;
+ if (checkSum != null) {
+ endIndex = fileController.loadIndexedLogsCheckSum(
+ checkSum.getPath());
+ }
+ IndexedLogsMeta indexedLogsMeta = null;
+ try {
+ indexedLogsMeta = fileController.loadIndexedLogsMeta(
+ thisNodeFile.getPath(), endIndex);
+ } catch (Exception ex) {
+ // DO NOTHING
+ LOG.warn("Can not load log meta from the log file:"
+ + thisNodeFile.getPath());
+ continue;
+ }
+ if (indexedLogsMeta == null) {
+ continue;
+ }
+ Map<ApplicationAccessType, String> appAcls = indexedLogsMeta.getAcls();
+ String user = indexedLogsMeta.getUser();
+ String remoteUser = request().getRemoteUser();
+ if (!checkAcls(conf, appId, user, appAcls, remoteUser)) {
+ html.h1().__("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity
+ + " in log file [" + thisNodeFile.getPath().getName() + "]")
+ .__();
+ LOG.error("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity);
+ continue;
+ }
+ String compressAlgo = indexedLogsMeta.getCompressName();
+ List<IndexedFileLogMeta> candidates = new ArrayList<>();
+ for (IndexedPerAggregationLogMeta logMeta
+ : indexedLogsMeta.getLogMetas()) {
+ for (Entry<String, List<IndexedFileLogMeta>> meta
+ : logMeta.getLogMetas().entrySet()) {
+ for (IndexedFileLogMeta log : meta.getValue()) {
+ if (!log.getContainerId().equals(containerId.toString())) {
+ continue;
+ }
+ if (desiredLogType != null && !desiredLogType.isEmpty()
+ && !desiredLogType.equals(log.getFileName())) {
+ continue;
+ }
+ candidates.add(log);
+ }
+ }
+ }
+ if (candidates.isEmpty()) {
+ continue;
+ }
+
+ Algorithm compressName = Compression.getCompressionAlgorithmByName(
+ compressAlgo);
+ Decompressor decompressor = compressName.getDecompressor();
+ FileContext fileContext = FileContext.getFileContext(
+ thisNodeFile.getPath().toUri(), conf);
+ FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
+ int bufferSize = 65536;
+ for (IndexedFileLogMeta candidate : candidates) {
+ byte[] cbuf = new byte[bufferSize];
+ InputStream in = null;
+ try {
+ in = compressName.createDecompressionStream(
+ new BoundedRangeFileInputStream(fsin,
+ candidate.getStartIndex(),
+ candidate.getFileCompressedSize()),
+ decompressor,
+ LogAggregationIndexedFileController.getFSInputBufferSize(
+ conf));
+ long logLength = candidate.getFileSize();
+ html.pre().__("\n\n").__();
+ html.p().__("Log Type: " + candidate.getFileName()).__();
+ html.p().__("Log Upload Time: " + Times.format(
+ candidate.getLastModificatedTime())).__();
+ html.p().__("Log Length: " + Long.toString(
+ logLength)).__();
+ long startIndex = start < 0
+ ? logLength + start : start;
+ startIndex = startIndex < 0 ? 0 : startIndex;
+ startIndex = startIndex > logLength ? logLength : startIndex;
+ long endLogIndex = end < 0
+ ? logLength + end : end;
+ endLogIndex = endLogIndex < 0 ? 0 : endLogIndex;
+ endLogIndex = endLogIndex > logLength ? logLength : endLogIndex;
+ endLogIndex = endLogIndex < startIndex ?
+ startIndex : endLogIndex;
+ long toRead = endLogIndex - startIndex;
+ if (toRead < logLength) {
+ html.p().__("Showing " + toRead + " bytes of " + logLength
+ + " total. Click ").a(url("logs", $(NM_NODENAME),
+ $(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
+ candidate.getFileName(), "?start=0"), "here").
+ __(" for the full log.").__();
+ }
+ long totalSkipped = 0;
+ while (totalSkipped < start) {
+ long ret = in.skip(start - totalSkipped);
+ if (ret == 0) {
+ //Read one byte
+ int nextByte = in.read();
+ // Check if we have reached EOF
+ if (nextByte == -1) {
+ throw new IOException("Premature EOF from container log");
+ }
+ ret = 1;
+ }
+ totalSkipped += ret;
+ }
+ int len = 0;
+ int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ PRE<Hamlet> pre = html.pre();
+
+ while (toRead > 0
+ && (len = in.read(cbuf, 0, currentToRead)) > 0) {
+ pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
+ toRead = toRead - len;
+ currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ }
+
+ pre.__();
+ foundLog = true;
+ } catch (Exception ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ continue;
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ }
+ }
+ if (!foundLog) {
+ if (desiredLogType.isEmpty()) {
+ html.h1("No logs available for container " + containerId.toString());
+ } else {
+ html.h1("Unable to locate '" + desiredLogType
+ + "' log for container " + containerId.toString());
+ }
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception ex) {
+ html.h1().__("Error getting logs for " + logEntity).__();
+ LOG.error("Error getting logs for " + logEntity, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
new file mode 100644
index 0000000..6cb2062
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -0,0 +1,1056 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
+import org.apache.hadoop.io.file.tfile.Compression;
+import org.apache.hadoop.io.file.tfile.SimpleBufferedOutputStream;
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Indexed Log Aggregation File Format implementation.
+ *
+ */
+@Private
+@Unstable
+public class LogAggregationIndexedFileController
+ extends LogAggregationFileController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ LogAggregationIndexedFileController.class);
+ private static final String FS_OUTPUT_BUF_SIZE_ATTR =
+ "indexedFile.fs.output.buffer.size";
+ private static final String FS_INPUT_BUF_SIZE_ATTR =
+ "indexedFile.fs.input.buffer.size";
+ private static final String FS_NUM_RETRIES_ATTR =
+ "indexedFile.fs.op.num-retries";
+ private static final String FS_RETRY_INTERVAL_MS_ATTR =
+ "indexedFile.fs.retry-interval-ms";
+ private static final int UUID_LENGTH = 36;
+
+ @VisibleForTesting
+ public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
+
+ private int fsNumRetries = 3;
+ private long fsRetryInterval = 1000L;
+ private static final int VERSION = 1;
+ private IndexedLogsMeta indexedLogsMeta = null;
+ private IndexedPerAggregationLogMeta logsMetaInThisCycle;
+ private long logAggregationTimeInThisCycle;
+ private FSDataOutputStream fsDataOStream;
+ private Algorithm compressAlgo;
+ private CachedIndexedLogsMeta cachedIndexedLogsMeta = null;
+ private boolean logAggregationSuccessfullyInThisCyCle = false;
+ private long currentOffSet = 0;
+ private Path remoteLogCheckSumFile;
+ private FileContext fc;
+ private UserGroupInformation ugi;
+ private String uuid = null;
+
+ public LogAggregationIndexedFileController() {}
+
+ @Override
+ public void initInternal(Configuration conf) {
+ // Currently, we need the underlying File System to support append
+ // operation. Will remove this check after we finish
+ // LogAggregationIndexedFileController for non-append mode.
+ boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true);
+ if (!append) {
+ throw new YarnRuntimeException("The configuration:"
+ + LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only"
+ + " use LogAggregationIndexedFileController when the FileSystem "
+ + "support append operations.");
+ }
+ String remoteDirStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ this.fileControllerName);
+ String remoteDir = conf.get(remoteDirStr);
+ if (remoteDir == null || remoteDir.isEmpty()) {
+ remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
+ }
+ this.remoteRootLogDir = new Path(remoteDir);
+ String suffix = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ this.fileControllerName);
+ this.remoteRootLogDirSuffix = conf.get(suffix);
+ if (this.remoteRootLogDirSuffix == null
+ || this.remoteRootLogDirSuffix.isEmpty()) {
+ this.remoteRootLogDirSuffix = conf.get(
+ YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
+ + "-ifile";
+ }
+ String compressName = conf.get(
+ YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
+ this.compressAlgo = Compression.getCompressionAlgorithmByName(
+ compressName);
+ this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3);
+ this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L);
+ }
+
+ @Override
+ public void initializeWriter(
+ final LogAggregationFileControllerContext context)
+ throws IOException {
+ final UserGroupInformation userUgi = context.getUserUgi();
+ final Map<ApplicationAccessType, String> appAcls = context.getAppAcls();
+ final String nodeId = context.getNodeId().toString();
+ final Path remoteLogFile = context.getRemoteNodeLogFileForApp();
+ this.ugi = userUgi;
+ logAggregationSuccessfullyInThisCyCle = false;
+ logsMetaInThisCycle = new IndexedPerAggregationLogMeta();
+ logAggregationTimeInThisCycle = System.currentTimeMillis();
+ logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle);
+ logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName());
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ fc = FileContext.getFileContext(
+ remoteRootLogDir.toUri(), conf);
+ fc.setUMask(APP_LOG_FILE_UMASK);
+ boolean fileExist = fc.util().exists(remoteLogFile);
+ if (fileExist && context.isLogAggregationInRolling()) {
+ fsDataOStream = fc.create(remoteLogFile,
+ EnumSet.of(CreateFlag.APPEND),
+ new Options.CreateOpts[] {});
+ if (uuid == null) {
+ FSDataInputStream fsDataInputStream = null;
+ try {
+ fsDataInputStream = fc.open(remoteLogFile);
+ byte[] b = new byte[UUID_LENGTH];
+ int actual = fsDataInputStream.read(b);
+ if (actual != UUID_LENGTH) {
+ // Get an error when parse the UUID from existed log file.
+ // Simply OverWrite the existed log file and re-create the
+ // UUID.
+ fsDataOStream = fc.create(remoteLogFile,
+ EnumSet.of(CreateFlag.OVERWRITE),
+ new Options.CreateOpts[] {});
+ uuid = UUID.randomUUID().toString();
+ fsDataOStream.write(uuid.getBytes(Charset.forName("UTF-8")));
+ fsDataOStream.flush();
+ } else {
+ uuid = new String(b, Charset.forName("UTF-8"));
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDataInputStream);
+ }
+ }
+ // if the remote log file exists, but we do not have any
+ // indexedLogsMeta. We need to re-load indexedLogsMeta from
+ // the existing remote log file. If the re-load fails, we simply
+ // re-create a new indexedLogsMeta object. And will re-load
+ // the indexedLogsMeta from checksum file later.
+ if (indexedLogsMeta == null) {
+ try {
+ indexedLogsMeta = loadIndexedLogsMeta(remoteLogFile);
+ } catch (IOException ex) {
+ // DO NOTHING
+ }
+ }
+ } else {
+ fsDataOStream = fc.create(remoteLogFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ new Options.CreateOpts[] {});
+ if (uuid == null) {
+ uuid = UUID.randomUUID().toString();
+ }
+ byte[] b = uuid.getBytes(Charset.forName("UTF-8"));
+ fsDataOStream.write(b);
+ fsDataOStream.flush();
+ }
+ if (indexedLogsMeta == null) {
+ indexedLogsMeta = new IndexedLogsMeta();
+ indexedLogsMeta.setVersion(VERSION);
+ indexedLogsMeta.setUser(userUgi.getShortUserName());
+ indexedLogsMeta.setAcls(appAcls);
+ indexedLogsMeta.setNodeId(nodeId);
+ String compressName = conf.get(
+ YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
+ indexedLogsMeta.setCompressName(compressName);
+ }
+ final long currentAggregatedLogFileLength = fc
+ .getFileStatus(remoteLogFile).getLen();
+ // only check the check-sum file when we are in append mode
+ if (context.isLogAggregationInRolling()) {
+ // check whether the checksum file exists to figure out
+ // whether the previous log aggregation process is successful
+ // and the aggregated log file is corrupted or not.
+ remoteLogCheckSumFile = new Path(remoteLogFile.getParent(),
+ (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX));
+ boolean exist = fc.util().exists(remoteLogCheckSumFile);
+ if (!exist) {
+ FSDataOutputStream checksumFileOutputStream = null;
+ try {
+ checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ new Options.CreateOpts[] {});
+ checksumFileOutputStream.writeLong(
+ currentAggregatedLogFileLength);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
+ }
+ } else {
+ FSDataInputStream checksumFileInputStream = null;
+ try {
+ checksumFileInputStream = fc.open(remoteLogCheckSumFile);
+ long endIndex = checksumFileInputStream.readLong();
+ IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta(
+ remoteLogFile, endIndex);
+ if (recoveredLogsMeta == null) {
+ indexedLogsMeta.getLogMetas().clear();
+ } else {
+ indexedLogsMeta = recoveredLogsMeta;
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
+ }
+ }
+ }
+ // append a simple character("\n") to move the writer cursor, so
+ // we could get the correct position when we call
+ // fsOutputStream.getStartPos()
+ final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
+ fsDataOStream.write(dummyBytes);
+ fsDataOStream.flush();
+
+ if (fsDataOStream.getPos() >= (currentAggregatedLogFileLength
+ + dummyBytes.length)) {
+ currentOffSet = 0;
+ } else {
+ currentOffSet = currentAggregatedLogFileLength;
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void closeWriter() {
+ IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
+ }
+
+ @Override
+ public void write(LogKey logKey, LogValue logValue) throws IOException {
+ String containerId = logKey.toString();
+ Set<File> pendingUploadFiles = logValue
+ .getPendingLogFilesToUploadForThisContainer();
+ List<IndexedFileLogMeta> metas = new ArrayList<>();
+ for (File logFile : pendingUploadFiles) {
+ FileInputStream in = null;
+ try {
+ in = SecureIOUtils.openForRead(logFile, logValue.getUser(), null);
+ } catch (IOException e) {
+ logErrorMessage(logFile, e);
+ IOUtils.cleanupWithLogger(LOG, in);
+ continue;
+ }
+ final long fileLength = logFile.length();
+ IndexedFileOutputStreamState outputStreamState = null;
+ try {
+ outputStreamState = new IndexedFileOutputStreamState(
+ this.compressAlgo, this.fsDataOStream, conf, this.currentOffSet);
+ byte[] buf = new byte[65535];
+ int len = 0;
+ long bytesLeft = fileLength;
+ while ((len = in.read(buf)) != -1) {
+ //If buffer contents within fileLength, write
+ if (len < bytesLeft) {
+ outputStreamState.getOutputStream().write(buf, 0, len);
+ bytesLeft-=len;
+ } else {
+ //else only write contents within fileLength, then exit early
+ outputStreamState.getOutputStream().write(buf, 0,
+ (int)bytesLeft);
+ break;
+ }
+ }
+ long newLength = logFile.length();
+ if(fileLength < newLength) {
+ LOG.warn("Aggregated logs truncated by approximately "+
+ (newLength-fileLength) +" bytes.");
+ }
+ logAggregationSuccessfullyInThisCyCle = true;
+ } catch (IOException e) {
+ String message = logErrorMessage(logFile, e);
+ if (outputStreamState != null &&
+ outputStreamState.getOutputStream() != null) {
+ outputStreamState.getOutputStream().write(
+ message.getBytes(Charset.forName("UTF-8")));
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, in);
+ }
+
+ IndexedFileLogMeta meta = new IndexedFileLogMeta();
+ meta.setContainerId(containerId.toString());
+ meta.setFileName(logFile.getName());
+ if (outputStreamState != null) {
+ outputStreamState.finish();
+ meta.setFileCompressedSize(outputStreamState.getCompressedSize());
+ meta.setStartIndex(outputStreamState.getStartPos());
+ meta.setFileSize(fileLength);
+ }
+ meta.setLastModificatedTime(logFile.lastModified());
+ metas.add(meta);
+ }
+ logsMetaInThisCycle.addContainerLogMeta(containerId, metas);
+ }
+
+ @Override
+ public void postWrite(LogAggregationFileControllerContext record)
+ throws Exception {
+ // always aggregate the previous logsMeta, and append them together
+ // at the end of the file
+ indexedLogsMeta.addLogMeta(logsMetaInThisCycle);
+ byte[] b = SerializationUtils.serialize(indexedLogsMeta);
+ this.fsDataOStream.write(b);
+ int length = b.length;
+ this.fsDataOStream.writeInt(length);
+ byte[] separator = this.uuid.getBytes(Charset.forName("UTF-8"));
+ this.fsDataOStream.write(separator);
+ if (logAggregationSuccessfullyInThisCyCle) {
+ deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile);
+ }
+ }
+
+ private void deleteFileWithRetries(final FileContext fileContext,
+ final UserGroupInformation userUgi,
+ final Path deletePath) throws Exception {
+ new FSAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ deleteFileWithPrivilege(fileContext, userUgi, deletePath);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private Object deleteFileWithPrivilege(final FileContext fileContext,
+ final UserGroupInformation userUgi, final Path fileToDelete)
+ throws Exception {
+ return userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ if (fileContext.util().exists(fileToDelete)) {
+ fileContext.delete(fileToDelete, false);
+ }
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+ OutputStream os) throws IOException {
+ boolean findLogs = false;
+ boolean createPrintStream = (os == null);
+ ApplicationId appId = logRequest.getAppId();
+ String nodeId = logRequest.getNodeId();
+ String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
+ : LogAggregationUtils.getNodeString(nodeId);
+ List<String> logTypes = new ArrayList<>();
+ if (logRequest.getLogTypes() != null && !logRequest
+ .getLogTypes().isEmpty()) {
+ logTypes.addAll(logRequest.getLogTypes());
+ }
+ String containerIdStr = logRequest.getContainerId();
+ boolean getAllContainers = (containerIdStr == null
+ || containerIdStr.isEmpty());
+ long size = logRequest.getBytes();
+ List<FileStatus> nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
+ this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+ if (nodeFiles.isEmpty()) {
+ throw new IOException("There is no available log fils for "
+ + "application:" + appId);
+ }
+ Map<String, FileStatus> checkSumFiles = filterFiles(
+ nodeFiles, CHECK_SUM_FILE_SUFFIX);
+ List<FileStatus> fileToRead = getNodeLogFileToRead(
+ nodeFiles, nodeIdStr, appId);
+ byte[] buf = new byte[65535];
+ for (FileStatus thisNodeFile : fileToRead) {
+ String nodeName = thisNodeFile.getPath().getName();
+ FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
+ thisNodeFile.getPath().getName());
+ long endIndex = -1;
+ if (checkSum != null) {
+ endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
+ }
+ IndexedLogsMeta indexedLogsMeta = null;
+ try {
+ indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(),
+ endIndex);
+ } catch (Exception ex) {
+ // DO NOTHING
+ LOG.warn("Can not load log meta from the log file:"
+ + thisNodeFile.getPath());
+ continue;
+ }
+ if (indexedLogsMeta == null) {
+ continue;
+ }
+ String compressAlgo = indexedLogsMeta.getCompressName();
+ List<IndexedFileLogMeta> candidates = new ArrayList<>();
+ for (IndexedPerAggregationLogMeta logMeta
+ : indexedLogsMeta.getLogMetas()) {
+ for (Entry<String, List<IndexedFileLogMeta>> meta
+ : logMeta.getLogMetas().entrySet()) {
+ for (IndexedFileLogMeta log : meta.getValue()) {
+ if (!getAllContainers && !log.getContainerId()
+ .equals(containerIdStr)) {
+ continue;
+ }
+ if (logTypes != null && !logTypes.isEmpty() &&
+ !logTypes.contains(log.getFileName())) {
+ continue;
+ }
+ candidates.add(log);
+ }
+ }
+ }
+ if (candidates.isEmpty()) {
+ continue;
+ }
+
+ Algorithm compressName = Compression.getCompressionAlgorithmByName(
+ compressAlgo);
+ Decompressor decompressor = compressName.getDecompressor();
+ FileContext fileContext = FileContext.getFileContext(
+ thisNodeFile.getPath().toUri(), conf);
+ FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
+ String currentContainer = "";
+ for (IndexedFileLogMeta candidate : candidates) {
+ if (!candidate.getContainerId().equals(currentContainer)) {
+ if (createPrintStream) {
+ closePrintStream(os);
+ os = LogToolUtils.createPrintStream(
+ logRequest.getOutputLocalDir(),
+ thisNodeFile.getPath().getName(),
+ candidate.getContainerId());
+ currentContainer = candidate.getContainerId();
+ }
+ }
+ InputStream in = null;
+ try {
+ in = compressName.createDecompressionStream(
+ new BoundedRangeFileInputStream(fsin,
+ candidate.getStartIndex(),
+ candidate.getFileCompressedSize()),
+ decompressor, getFSInputBufferSize(conf));
+ LogToolUtils.outputContainerLog(candidate.getContainerId(),
+ nodeName, candidate.getFileName(), candidate.getFileSize(), size,
+ Times.format(candidate.getLastModificatedTime()),
+ in, os, buf, ContainerLogAggregationType.AGGREGATED);
+ byte[] b = aggregatedLogSuffix(candidate.getFileName())
+ .getBytes(Charset.forName("UTF-8"));
+ os.write(b, 0, b.length);
+ findLogs = true;
+ } catch (IOException e) {
+ System.err.println(e.getMessage());
+ compressName.returnDecompressor(decompressor);
+ continue;
+ } finally {
+ os.flush();
+ IOUtils.cleanupWithLogger(LOG, in);
+ }
+ }
+ }
+ return findLogs;
+ }
+
+ // TODO: fix me if the remote file system does not support append operation.
+ @Override
+ public List<ContainerLogMeta> readAggregatedLogsMeta(
+ ContainerLogsRequest logRequest) throws IOException {
+ List<IndexedLogsMeta> listOfLogsMeta = new ArrayList<>();
+ List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
+ String containerIdStr = logRequest.getContainerId();
+ String nodeId = logRequest.getNodeId();
+ ApplicationId appId = logRequest.getAppId();
+ String appOwner = logRequest.getAppOwner();
+ boolean getAllContainers = (containerIdStr == null ||
+ containerIdStr.isEmpty());
+ String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
+ : LogAggregationUtils.getNodeString(nodeId);
+ List<FileStatus> nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
+ this.remoteRootLogDirSuffix);
+ if (nodeFiles.isEmpty()) {
+ throw new IOException("There is no available log fils for "
+ + "application:" + appId);
+ }
+ Map<String, FileStatus> checkSumFiles = filterFiles(
+ nodeFiles, CHECK_SUM_FILE_SUFFIX);
+ List<FileStatus> fileToRead = getNodeLogFileToRead(
+ nodeFiles, nodeIdStr, appId);
+ for(FileStatus thisNodeFile : fileToRead) {
+ try {
+ FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
+ thisNodeFile.getPath().getName());
+ long endIndex = -1;
+ if (checkSum != null) {
+ endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
+ }
+ IndexedLogsMeta current = loadIndexedLogsMeta(
+ thisNodeFile.getPath(), endIndex);
+ if (current != null) {
+ listOfLogsMeta.add(current);
+ }
+ } catch (IOException ex) {
+ // DO NOTHING
+ LOG.warn("Can not get log meta from the log file:"
+ + thisNodeFile.getPath());
+ }
+ }
+ for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) {
+ String curNodeId = indexedLogMeta.getNodeId();
+ for (IndexedPerAggregationLogMeta logMeta :
+ indexedLogMeta.getLogMetas()) {
+ if (getAllContainers) {
+ for (Entry<String, List<IndexedFileLogMeta>> log : logMeta
+ .getLogMetas().entrySet()) {
+ ContainerLogMeta meta = new ContainerLogMeta(
+ log.getKey().toString(), curNodeId);
+ for (IndexedFileLogMeta aMeta : log.getValue()) {
+ meta.addLogMeta(aMeta.getFileName(), Long.toString(
+ aMeta.getFileSize()),
+ Times.format(aMeta.getLastModificatedTime()));
+ }
+ containersLogMeta.add(meta);
+ }
+ } else if (logMeta.getContainerLogMeta(containerIdStr) != null) {
+ ContainerLogMeta meta = new ContainerLogMeta(containerIdStr,
+ curNodeId);
+ for (IndexedFileLogMeta log :
+ logMeta.getContainerLogMeta(containerIdStr)) {
+ meta.addLogMeta(log.getFileName(), Long.toString(
+ log.getFileSize()),
+ Times.format(log.getLastModificatedTime()));
+ }
+ containersLogMeta.add(meta);
+ }
+ }
+ }
+ Collections.sort(containersLogMeta, new Comparator<ContainerLogMeta>() {
+ @Override
+ public int compare(ContainerLogMeta o1, ContainerLogMeta o2) {
+ return o1.getContainerId().compareTo(o2.getContainerId());
+ }
+ });
+ return containersLogMeta;
+ }
+
+ @Private
+ public Map<String, FileStatus> filterFiles(
+ List<FileStatus> fileList, final String suffix) throws IOException {
+ Map<String, FileStatus> checkSumFiles = new HashMap<>();
+ Set<FileStatus> status = new HashSet<FileStatus>(fileList);
+ Iterable<FileStatus> mask =
+ Iterables.filter(status, new Predicate<FileStatus>() {
+ @Override
+ public boolean apply(FileStatus next) {
+ return next.getPath().getName().endsWith(
+ suffix);
+ }
+ });
+ status = Sets.newHashSet(mask);
+ for (FileStatus file : status) {
+ checkSumFiles.put(file.getPath().getName(), file);
+ }
+ return checkSumFiles;
+ }
+
+ @Private
+ public List<FileStatus> getNodeLogFileToRead(
+ List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
+ throws IOException {
+ List<FileStatus> listOfFiles = new ArrayList<>();
+ List<FileStatus> files = new ArrayList<>(nodeFiles);
+ for (FileStatus file : files) {
+ String nodeName = file.getPath().getName();
+ if ((nodeId == null || nodeId.isEmpty()
+ || nodeName.contains(LogAggregationUtils
+ .getNodeString(nodeId))) && !nodeName.endsWith(
+ LogAggregationUtils.TMP_FILE_SUFFIX) &&
+ !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
+ if (nodeName.equals(appId + ".har")) {
+ Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
+ files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
+ continue;
+ }
+ listOfFiles.add(file);
+ }
+ }
+ return listOfFiles;
+ }
+
+ @Private
+ public FileStatus getAllChecksumFiles(Map<String, FileStatus> fileMap,
+ String fileName) {
+ for (Entry<String, FileStatus> file : fileMap.entrySet()) {
+ if (file.getKey().startsWith(fileName) && file.getKey()
+ .endsWith(CHECK_SUM_FILE_SUFFIX)) {
+ return file.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void renderAggregatedLogsBlock(Block html, ViewContext context) {
+ IndexedFileAggregatedLogsBlock block = new IndexedFileAggregatedLogsBlock(
+ context, this.conf, this);
+ block.render(html);
+ }
+
+ @Override
+ public String getApplicationOwner(Path aggregatedLogPath)
+ throws IOException {
+ if (this.cachedIndexedLogsMeta == null
+ || !this.cachedIndexedLogsMeta.getRemoteLogPath()
+ .equals(aggregatedLogPath)) {
+ this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
+ loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath);
+ }
+ return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser();
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationAcls(
+ Path aggregatedLogPath) throws IOException {
+ if (this.cachedIndexedLogsMeta == null
+ || !this.cachedIndexedLogsMeta.getRemoteLogPath()
+ .equals(aggregatedLogPath)) {
+ this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
+ loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath);
+ }
+ return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls();
+ }
+
+ @Override
+ public Path getRemoteAppLogDir(ApplicationId appId, String user)
+ throws IOException {
+ return LogAggregationUtils.getRemoteAppLogDir(conf, appId, user,
+ this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+ }
+
+ @Private
+ public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end)
+ throws IOException {
+ FileContext fileContext =
+ FileContext.getFileContext(remoteLogPath.toUri(), conf);
+ FSDataInputStream fsDataIStream = null;
+ try {
+ fsDataIStream = fileContext.open(remoteLogPath);
+ if (end == 0) {
+ return null;
+ }
+ long fileLength = end < 0 ? fileContext.getFileStatus(
+ remoteLogPath).getLen() : end;
+ fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
+ int offset = fsDataIStream.readInt();
+ byte[] array = new byte[offset];
+ fsDataIStream.seek(
+ fileLength - offset - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
+ int actual = fsDataIStream.read(array);
+ if (actual != offset) {
+ throw new IOException("Error on loading log meta from "
+ + remoteLogPath);
+ }
+ return (IndexedLogsMeta)SerializationUtils
+ .deserialize(array);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDataIStream);
+ }
+ }
+
+ private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath)
+ throws IOException {
+ return loadIndexedLogsMeta(remoteLogPath, -1);
+ }
+
+ @Private
+ public long loadIndexedLogsCheckSum(Path remoteLogCheckSumPath)
+ throws IOException {
+ FileContext fileContext =
+ FileContext.getFileContext(remoteLogCheckSumPath.toUri(), conf);
+ FSDataInputStream fsDataIStream = null;
+ try {
+ fsDataIStream = fileContext.open(remoteLogCheckSumPath);
+ return fsDataIStream.readLong();
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDataIStream);
+ }
+ }
+
+ /**
+ * This IndexedLogsMeta includes all the meta information
+ * for the aggregated log file.
+ */
+ @Private
+ @VisibleForTesting
+ public static class IndexedLogsMeta implements Serializable {
+
+ private static final long serialVersionUID = 5439875373L;
+ private int version;
+ private String user;
+ private String compressName;
+ private Map<ApplicationAccessType, String> acls;
+ private String nodeId;
+ private List<IndexedPerAggregationLogMeta> logMetas = new ArrayList<>();
+
+ public int getVersion() {
+ return this.version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public String getUser() {
+ return this.user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public Map<ApplicationAccessType, String> getAcls() {
+ return this.acls;
+ }
+
+ public void setAcls(Map<ApplicationAccessType, String> acls) {
+ this.acls = acls;
+ }
+
+ public String getCompressName() {
+ return compressName;
+ }
+
+ public void setCompressName(String compressName) {
+ this.compressName = compressName;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public void addLogMeta(IndexedPerAggregationLogMeta logMeta) {
+ logMetas.add(logMeta);
+ }
+
+ public List<IndexedPerAggregationLogMeta> getLogMetas() {
+ return logMetas;
+ }
+ }
+
+ /**
+ * This IndexedPerAggregationLogMeta includes the meta information
+ * for all files which would be aggregated in one
+ * Log aggregation cycle.
+ */
+ public static class IndexedPerAggregationLogMeta implements Serializable {
+ private static final long serialVersionUID = 3929298383L;
+ private String remoteNodeLogFileName;
+ private Map<String, List<IndexedFileLogMeta>> logMetas = new HashMap<>();
+ private long uploadTimeStamp;
+
+ public String getRemoteNodeFile() {
+ return remoteNodeLogFileName;
+ }
+ public void setRemoteNodeFile(String remoteNodeLogFileName) {
+ this.remoteNodeLogFileName = remoteNodeLogFileName;
+ }
+
+ public void addContainerLogMeta(String containerId,
+ List<IndexedFileLogMeta> logMeta) {
+ logMetas.put(containerId, logMeta);
+ }
+
+ public List<IndexedFileLogMeta> getContainerLogMeta(String containerId) {
+ return logMetas.get(containerId);
+ }
+
+ public Map<String, List<IndexedFileLogMeta>> getLogMetas() {
+ return logMetas;
+ }
+
+ public long getUploadTimeStamp() {
+ return uploadTimeStamp;
+ }
+
+ public void setUploadTimeStamp(long uploadTimeStamp) {
+ this.uploadTimeStamp = uploadTimeStamp;
+ }
+ }
+
+ /**
+ * This IndexedFileLogMeta includes the meta information
+ * for a single file which would be aggregated in one
+ * Log aggregation cycle.
+ *
+ */
+ @Private
+ @VisibleForTesting
+ public static class IndexedFileLogMeta implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private String containerId;
+ private String fileName;
+ private long fileSize;
+ private long fileCompressedSize;
+ private long lastModificatedTime;
+ private long startIndex;
+
+ public String getFileName() {
+ return fileName;
+ }
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+ public void setFileSize(long fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public long getFileCompressedSize() {
+ return fileCompressedSize;
+ }
+ public void setFileCompressedSize(long fileCompressedSize) {
+ this.fileCompressedSize = fileCompressedSize;
+ }
+
+ public long getLastModificatedTime() {
+ return lastModificatedTime;
+ }
+ public void setLastModificatedTime(long lastModificatedTime) {
+ this.lastModificatedTime = lastModificatedTime;
+ }
+
+ public long getStartIndex() {
+ return startIndex;
+ }
+ public void setStartIndex(long startIndex) {
+ this.startIndex = startIndex;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+ public void setContainerId(String containerId) {
+ this.containerId = containerId;
+ }
+ }
+
+ private static String logErrorMessage(File logFile, Exception e) {
+ String message = "Error aggregating log file. Log file : "
+ + logFile.getAbsolutePath() + ". " + e.getMessage();
+ LOG.error(message, e);
+ return message;
+ }
+
+ private static class IndexedFileOutputStreamState {
+ private final Algorithm compressAlgo;
+ private Compressor compressor;
+ private final FSDataOutputStream fsOut;
+ private long posStart;
+ private final SimpleBufferedOutputStream fsBufferedOutput;
+ private OutputStream out;
+ private long offset;
+
+ IndexedFileOutputStreamState(Algorithm compressionName,
+ FSDataOutputStream fsOut, Configuration conf, long offset)
+ throws IOException {
+ this.compressAlgo = compressionName;
+ this.fsOut = fsOut;
+ this.offset = offset;
+ this.posStart = fsOut.getPos();
+
+ BytesWritable fsOutputBuffer = new BytesWritable();
+ fsOutputBuffer.setCapacity(LogAggregationIndexedFileController
+ .getFSOutputBufferSize(conf));
+
+ this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut,
+ fsOutputBuffer.getBytes());
+
+ this.compressor = compressAlgo.getCompressor();
+
+ try {
+ this.out = compressAlgo.createCompressionStream(
+ fsBufferedOutput, compressor, 0);
+ } catch (IOException e) {
+ compressAlgo.returnCompressor(compressor);
+ throw e;
+ }
+ }
+
+ OutputStream getOutputStream() {
+ return out;
+ }
+
+ long getCurrentPos() throws IOException {
+ return fsOut.getPos() + fsBufferedOutput.size();
+ }
+
+ long getStartPos() {
+ return posStart + offset;
+ }
+
+ long getCompressedSize() throws IOException {
+ long ret = getCurrentPos() - posStart;
+ return ret;
+ }
+
+ void finish() throws IOException {
+ try {
+ if (out != null) {
+ out.flush();
+ out = null;
+ }
+ } finally {
+ compressAlgo.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ private static class CachedIndexedLogsMeta {
+ private final Path remoteLogPath;
+ private final IndexedLogsMeta indexedLogsMeta;
+ CachedIndexedLogsMeta(IndexedLogsMeta indexedLogsMeta,
+ Path remoteLogPath) {
+ this.indexedLogsMeta = indexedLogsMeta;
+ this.remoteLogPath = remoteLogPath;
+ }
+
+ public Path getRemoteLogPath() {
+ return this.remoteLogPath;
+ }
+
+ public IndexedLogsMeta getCachedIndexedLogsMeta() {
+ return this.indexedLogsMeta;
+ }
+ }
+
+ @Private
+ public static int getFSOutputBufferSize(Configuration conf) {
+ return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
+ }
+
+ @Private
+ public static int getFSInputBufferSize(Configuration conf) {
+ return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
+ }
+
+ private abstract class FSAction<T> {
+ abstract T run() throws Exception;
+
+ T runWithRetries() throws Exception {
+ int retry = 0;
+ while (true) {
+ try {
+ return run();
+ } catch (IOException e) {
+ LOG.info("Exception while executing an FS operation.", e);
+ if (++retry > fsNumRetries) {
+ LOG.info("Maxed out FS retries. Giving up!");
+ throw e;
+ }
+ LOG.info("Retrying operation on FS. Retry no. " + retry);
+ Thread.sleep(fsRetryInterval);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3eade44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
new file mode 100644
index 0000000..08ddece
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
+import org.apache.hadoop.classification.InterfaceAudience;
+
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org