You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/31 23:39:38 UTC
[2/2] hadoop git commit: YARN-6877. Create an abstract log reader for
extendability. Contributed by Xuan Gong.
YARN-6877. Create an abstract log reader for extendability. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91cc070d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91cc070d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91cc070d
Branch: refs/heads/trunk
Commit: 91cc070d67533ebb3325b982eba2135e0d175a82
Parents: bac4e8c
Author: Junping Du <ju...@apache.org>
Authored: Thu Aug 31 16:41:43 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 31 16:41:43 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/client/cli/TestLogsCLI.java | 11 +-
.../logaggregation/LogAggregationWebUtils.java | 124 +++++
.../yarn/logaggregation/LogCLIHelpers.java | 457 +++----------------
.../yarn/logaggregation/LogToolUtils.java | 167 -------
.../LogAggregationFileController.java | 79 ++++
.../filecontroller/LogAggregationHtmlBlock.java | 186 ++++++++
.../LogAggregationTFileController.java | 127 ------
.../tfile/LogAggregationTFileController.java | 375 +++++++++++++++
.../tfile/TFileAggregatedLogsBlock.java | 241 ++++++++++
.../filecontroller/tfile/package-info.java | 20 +
.../yarn/webapp/log/AggregatedLogsBlock.java | 314 ++-----------
.../src/main/resources/yarn-default.xml | 2 +-
.../logaggregation/TestAggregatedLogsBlock.java | 82 +++-
...TestLogAggregationFileControllerFactory.java | 39 +-
.../webapp/AHSWebServices.java | 31 +-
.../logaggregation/AppLogAggregatorImpl.java | 2 +-
.../nodemanager/webapp/NMWebServices.java | 36 +-
.../TestAppLogAggregatorImpl.java | 2 +-
.../TestLogAggregationService.java | 2 +-
19 files changed, 1299 insertions(+), 998 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index 26e0319..509a790 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -468,7 +469,7 @@ public class TestLogsCLI {
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toString().contains(
logMessage(containerId1, "syslog")));
- assertTrue(sysOutStream.toString().contains("Log Upload Time"));
+ assertTrue(sysOutStream.toString().contains("LogLastModifiedTime"));
assertTrue(!sysOutStream.toString().contains(
"Logs for container " + containerId1.toString()
+ " are not present in this log-file."));
@@ -492,8 +493,12 @@ public class TestLogsCLI {
String logMessage = logMessage(containerId3, "stdout");
int fileContentSize = logMessage.getBytes().length;
- int tailContentSize = "\nEnd of LogType:stdout\n\n".getBytes().length;
-
+ StringBuilder sb = new StringBuilder();
+ String endOfFile = "End of LogType:stdout";
+ sb.append("\n" + endOfFile + "\n");
+ sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ + "\n\n");
+ int tailContentSize = sb.toString().length();
// specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from
// container log
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java
new file mode 100644
index 0000000..1de68fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
+
+/**
+ * Utils for rendering aggregated logs block.
+ *
+ */
+@Private
+public final class LogAggregationWebUtils {
+
+ private LogAggregationWebUtils() {}
+
+ /**
+ * Parse start index from html.
+ * @param html the html
+ * @param startStr the start index string
+ * @return the startIndex
+ */
+ public static long getLogStartIndex(Block html, String startStr)
+ throws NumberFormatException {
+ long start = -4096;
+
+ if (startStr != null && !startStr.isEmpty()) {
+ start = Long.parseLong(startStr);
+ }
+ return start;
+ }
+
+ /**
+ * Parse end index from html.
+ * @param html the html
+ * @param endStr the end index string
+ * @return the endIndex
+ */
+ public static long getLogEndIndex(Block html, String endStr)
+ throws NumberFormatException {
+ long end = Long.MAX_VALUE;
+
+ if (endStr != null && !endStr.isEmpty()) {
+ end = Long.parseLong(endStr);
+ }
+ return end;
+ }
+
+ /**
+ * Verify and parse containerId.
+ * @param html the html
+ * @param containerIdStr the containerId string
+ * @return the {@link ContainerId}
+ */
+ public static ContainerId verifyAndGetContainerId(Block html,
+ String containerIdStr) {
+ if (containerIdStr == null || containerIdStr.isEmpty()) {
+ html.h1().__("Cannot get container logs without a ContainerId").__();
+ return null;
+ }
+ ContainerId containerId = null;
+ try {
+ containerId = ContainerId.fromString(containerIdStr);
+ } catch (IllegalArgumentException e) {
+ html.h1()
+ .__("Cannot get container logs for invalid containerId: "
+ + containerIdStr).__();
+ return null;
+ }
+ return containerId;
+ }
+
+ /**
+ * Verify and parse NodeId.
+ * @param html the html
+ * @param nodeIdStr the nodeId string
+ * @return the {@link NodeId}
+ */
+ public static NodeId verifyAndGetNodeId(Block html, String nodeIdStr) {
+ if (nodeIdStr == null || nodeIdStr.isEmpty()) {
+ html.h1().__("Cannot get container logs without a NodeId").__();
+ return null;
+ }
+ NodeId nodeId = null;
+ try {
+ nodeId = NodeId.fromString(nodeIdStr);
+ } catch (IllegalArgumentException e) {
+ html.h1().__("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
+ .__();
+ return null;
+ }
+ return nodeId;
+ }
+
+ /**
+ * Verify and parse the application owner.
+ * @param html the html
+ * @param appOwner the Application owner
+ * @return the appOwner
+ */
+ public static String verifyAndGetAppOwner(Block html, String appOwner) {
+ if (appOwner == null || appOwner.isEmpty()) {
+ html.h1().__("Cannot get container logs without an app owner").__();
+ }
+ return appOwner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index cf34a1a..03acb33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.logaggregation;
-import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
@@ -37,15 +35,14 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import com.google.common.annotations.VisibleForTesting;
public class LogCLIHelpers implements Configurable {
@@ -56,6 +53,7 @@ public class LogCLIHelpers implements Configurable {
"Container: %s on %s";
private Configuration conf;
+ private LogAggregationFileControllerFactory factory;
@Private
@VisibleForTesting
@@ -130,71 +128,11 @@ public class LogCLIHelpers implements Configurable {
@VisibleForTesting
public int dumpAContainerLogsForLogType(ContainerLogsRequest options,
boolean outputFailure) throws IOException {
- ApplicationId applicationId = options.getAppId();
- String jobOwner = options.getAppOwner();
- String nodeId = options.getNodeId();
- String containerId = options.getContainerId();
- String localDir = options.getOutputLocalDir();
- List<String> logType = new ArrayList<String>(options.getLogTypes());
- RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
- applicationId, jobOwner);
- if (nodeFiles == null) {
- return -1;
- }
- boolean foundContainerLogs = false;
- while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- String fileName = thisNodeFile.getPath().getName();
- if (fileName.equals(applicationId + ".har")) {
- Path p = new Path("har:///"
- + thisNodeFile.getPath().toUri().getRawPath());
- nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
- continue;
- }
- if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
- && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader = null;
- PrintStream out = createPrintStream(localDir, fileName, containerId);
- try {
- reader = new AggregatedLogFormat.LogReader(getConf(),
- thisNodeFile.getPath());
- if (getContainerLogsStream(containerId, reader) == null) {
- continue;
- }
- String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
- containerId, thisNodeFile.getPath().getName());
- out.println(containerString);
- out.println("LogAggregationType: AGGREGATED");
- out.println(StringUtils.repeat("=", containerString.length()));
- // We have to re-create reader object to reset the stream index
- // after calling getContainerLogsStream which would move the stream
- // index to the end of the log file.
- reader =
- new AggregatedLogFormat.LogReader(getConf(),
- thisNodeFile.getPath());
- if (logType == null || logType.isEmpty()) {
- if (dumpAContainerLogs(containerId, reader, out,
- thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
- foundContainerLogs = true;
- }
- } else {
- if (dumpAContainerLogsForALogType(containerId, reader, out,
- thisNodeFile.getModificationTime(), logType,
- options.getBytes()) > -1) {
- foundContainerLogs = true;
- }
- }
- } finally {
- if (reader != null) {
- reader.close();
- }
- closePrintStream(out);
- }
- }
- }
- if (!foundContainerLogs) {
+ boolean foundAnyLogs = this.getFileController(options.getAppId(),
+ options.getAppOwner()).readAggregatedLogs(options, null);
+ if (!foundAnyLogs) {
if (outputFailure) {
- containerLogNotFound(containerId);
+ containerLogNotFound(options.getContainerId());
}
return -1;
}
@@ -204,217 +142,25 @@ public class LogCLIHelpers implements Configurable {
@Private
public int dumpAContainerLogsForLogTypeWithoutNodeId(
ContainerLogsRequest options) throws IOException {
- ApplicationId applicationId = options.getAppId();
- String jobOwner = options.getAppOwner();
- String containerId = options.getContainerId();
- String localDir = options.getOutputLocalDir();
- List<String> logType = new ArrayList<String>(options.getLogTypes());
- RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
- applicationId, jobOwner);
- if (nodeFiles == null) {
- return -1;
- }
- boolean foundContainerLogs = false;
- while(nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- if (!thisNodeFile.getPath().getName().endsWith(
- LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader = null;
- PrintStream out = System.out;
- try {
- reader =
- new AggregatedLogFormat.LogReader(getConf(),
- thisNodeFile.getPath());
- if (getContainerLogsStream(containerId, reader) == null) {
- continue;
- }
- // We have to re-create reader object to reset the stream index
- // after calling getContainerLogsStream which would move the stream
- // index to the end of the log file.
- reader =
- new AggregatedLogFormat.LogReader(getConf(),
- thisNodeFile.getPath());
- out = createPrintStream(localDir, thisNodeFile.getPath().getName(),
- containerId);
- String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
- containerId, thisNodeFile.getPath().getName());
- out.println(containerString);
- out.println("LogAggregationType: AGGREGATED");
- out.println(StringUtils.repeat("=", containerString.length()));
- if (logType == null || logType.isEmpty()) {
- if (dumpAContainerLogs(containerId, reader, out,
- thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
- foundContainerLogs = true;
- }
- } else {
- if (dumpAContainerLogsForALogType(containerId, reader, out,
- thisNodeFile.getModificationTime(), logType,
- options.getBytes()) > -1) {
- foundContainerLogs = true;
- }
- }
- } finally {
- if (reader != null) {
- reader.close();
- }
- closePrintStream(out);
- }
- }
- }
- if (!foundContainerLogs) {
- containerLogNotFound(containerId);
+ boolean foundAnyLogs = getFileController(options.getAppId(),
+ options.getAppOwner()).readAggregatedLogs(
+ options, null);
+ if (!foundAnyLogs) {
+ containerLogNotFound(options.getContainerId());
return -1;
}
return 0;
}
@Private
- public int dumpAContainerLogs(String containerIdStr,
- AggregatedLogFormat.LogReader reader, PrintStream out,
- long logUploadedTime, long bytes) throws IOException {
- DataInputStream valueStream = getContainerLogsStream(
- containerIdStr, reader);
-
- if (valueStream == null) {
- return -1;
- }
-
- boolean foundContainerLogs = false;
- while (true) {
- try {
- LogReader.readAContainerLogsForALogType(valueStream, out,
- logUploadedTime, bytes);
- foundContainerLogs = true;
- } catch (EOFException eof) {
- break;
- }
- }
- if (foundContainerLogs) {
- return 0;
- }
- return -1;
- }
-
- private DataInputStream getContainerLogsStream(String containerIdStr,
- AggregatedLogFormat.LogReader reader) throws IOException {
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
-
- while (valueStream != null && !key.toString().equals(containerIdStr)) {
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
- }
- return valueStream;
- }
-
- @Private
- public int dumpAContainerLogsForALogType(String containerIdStr,
- AggregatedLogFormat.LogReader reader, PrintStream out,
- long logUploadedTime, List<String> logType, long bytes)
- throws IOException {
- DataInputStream valueStream = getContainerLogsStream(
- containerIdStr, reader);
- if (valueStream == null) {
- return -1;
- }
-
- boolean foundContainerLogs = false;
- while (true) {
- try {
- int result = LogReader.readContainerLogsForALogType(
- valueStream, out, logUploadedTime, logType, bytes);
- if (result == 0) {
- foundContainerLogs = true;
- }
- } catch (EOFException eof) {
- break;
- }
- }
-
- if (foundContainerLogs) {
- return 0;
- }
- return -1;
- }
-
- @Private
public int dumpAllContainersLogs(ContainerLogsRequest options)
throws IOException {
- ApplicationId appId = options.getAppId();
- String appOwner = options.getAppOwner();
- String localDir = options.getOutputLocalDir();
- List<String> logTypes = new ArrayList<String>(options.getLogTypes());
- RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
- appId, appOwner);
- if (nodeFiles == null) {
- return -1;
- }
- boolean foundAnyLogs = false;
- while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- if (thisNodeFile.getPath().getName().equals(appId + ".har")) {
- Path p = new Path("har:///"
- + thisNodeFile.getPath().toUri().getRawPath());
- nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
- continue;
- }
- if (!thisNodeFile.getPath().getName()
- .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader =
- new AggregatedLogFormat.LogReader(getConf(),
- thisNodeFile.getPath());
- try {
-
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
-
- while (valueStream != null) {
- PrintStream out = createPrintStream(localDir,
- thisNodeFile.getPath().getName(), key.toString());
- try {
- String containerString = String.format(
- CONTAINER_ON_NODE_PATTERN, key,
- thisNodeFile.getPath().getName());
- out.println(containerString);
- out.println("LogAggregationType: AGGREGATED");
- out.println(StringUtils.repeat("=", containerString.length()));
- while (true) {
- try {
- if (logTypes == null || logTypes.isEmpty()) {
- LogReader.readAContainerLogsForALogType(valueStream, out,
- thisNodeFile.getModificationTime(),
- options.getBytes());
- foundAnyLogs = true;
- } else {
- int result = LogReader.readContainerLogsForALogType(
- valueStream, out, thisNodeFile.getModificationTime(),
- logTypes, options.getBytes());
- if (result == 0) {
- foundAnyLogs = true;
- }
- }
- } catch (EOFException eof) {
- break;
- }
- }
- } finally {
- closePrintStream(out);
- }
-
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
- }
- } finally {
- reader.close();
- }
- }
- }
+ boolean foundAnyLogs = getFileController(options.getAppId(),
+ options.getAppOwner()).readAggregatedLogs(
+ options, null);
if (!foundAnyLogs) {
- emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner)
+ emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(
+ conf, options.getAppId(), options.getAppOwner())
.toString());
return -1;
}
@@ -425,14 +171,13 @@ public class LogCLIHelpers implements Configurable {
public int printAContainerLogMetadata(ContainerLogsRequest options,
PrintStream out, PrintStream err)
throws IOException {
- ApplicationId appId = options.getAppId();
- String appOwner = options.getAppOwner();
String nodeId = options.getNodeId();
String containerIdStr = options.getContainerId();
List<ContainerLogMeta> containersLogMeta;
try {
- containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS(
- conf, appId, containerIdStr, nodeId, appOwner);
+ containersLogMeta = getFileController(options.getAppId(),
+ options.getAppOwner()).readAggregatedLogsMeta(
+ options);
} catch (Exception ex) {
err.println(ex.getMessage());
return -1;
@@ -473,8 +218,26 @@ public class LogCLIHelpers implements Configurable {
PrintStream out, PrintStream err) throws IOException {
ApplicationId appId = options.getAppId();
String appOwner = options.getAppOwner();
- RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
- appId, appOwner);
+ LogAggregationFileController fileFormat = null;
+ try {
+ fileFormat = getFileController(appId, appOwner);
+ } catch (Exception ex) {
+ err.println(ex.getMessage());
+ return;
+ }
+ RemoteIterator<FileStatus> nodeFiles = null;
+ try {
+ nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(conf, appId,
+ appOwner, fileFormat.getRemoteRootLogDir(),
+ fileFormat.getRemoteRootLogDirSuffix());
+ } catch (FileNotFoundException fnf) {
+ logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
+ conf, appId, appOwner).toString());
+ } catch (AccessControlException | AccessDeniedException ace) {
+ logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
+ conf, appId, appOwner).toString(), appOwner,
+ ace.getMessage());
+ }
if (nodeFiles == null) {
return;
}
@@ -497,44 +260,21 @@ public class LogCLIHelpers implements Configurable {
public void printContainersList(ContainerLogsRequest options,
PrintStream out, PrintStream err) throws IOException {
ApplicationId appId = options.getAppId();
- String appOwner = options.getAppOwner();
String nodeId = options.getNodeId();
- String nodeIdStr = (nodeId == null) ? null
- : LogAggregationUtils.getNodeString(nodeId);
- RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
- appId, appOwner);
- if (nodeFiles == null) {
- return;
- }
boolean foundAnyLogs = false;
- while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- if (nodeIdStr != null) {
- if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
- continue;
- }
- }
- if (!thisNodeFile.getPath().getName()
- .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader =
- new AggregatedLogFormat.LogReader(getConf(),
- thisNodeFile.getPath());
- try {
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
- while (valueStream != null) {
- out.println(String.format(CONTAINER_ON_NODE_PATTERN, key,
- thisNodeFile.getPath().getName()));
- foundAnyLogs = true;
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
- }
- } finally {
- reader.close();
- }
- }
+ List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
+ try {
+ containersLogMeta = getFileController(options.getAppId(),
+ options.getAppOwner()).readAggregatedLogsMeta(
+ options);
+ } catch (Exception ex) {
+ err.println(ex.getMessage());
+ }
+ for(ContainerLogMeta logMeta : containersLogMeta) {
+ out.println(String.format(CONTAINER_ON_NODE_PATTERN,
+ logMeta.getContainerId(),
+ logMeta.getNodeId()));
+ foundAnyLogs = true;
}
if (!foundAnyLogs) {
if (nodeId != null) {
@@ -547,26 +287,6 @@ public class LogCLIHelpers implements Configurable {
}
}
- private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId,
- String appOwner) throws IOException {
- RemoteIterator<FileStatus> nodeFiles = null;
- try {
- nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(
- conf, appId, appOwner);
- } catch (FileNotFoundException fnf) {
- logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
- conf, appId, appOwner).toString());
- } catch (AccessControlException | AccessDeniedException ace) {
- logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
- conf, appId, appOwner).toString(), appOwner,
- ace.getMessage());
- } catch (IOException ioe) {
- logDirIOError(LogAggregationUtils.getRemoteAppLogDir(
- conf, appId, appOwner).toString(), ioe.getMessage());
- }
- return nodeFiles;
- }
-
@Override
public void setConf(Configuration conf) {
this.conf = conf;
@@ -600,11 +320,6 @@ public class LogCLIHelpers implements Configurable {
+ ". Error message found: " + errorMessage);
}
- private static void logDirIOError(String remoteAppLogDir, String errMsg) {
- System.err.println("Cannot access to " + remoteAppLogDir +
- ". Error message found: " + errMsg);
- }
-
@Private
public PrintStream createPrintStream(String localDir, String nodeId,
String containerId) throws IOException {
@@ -628,59 +343,29 @@ public class LogCLIHelpers implements Configurable {
@Private
public Set<String> listContainerLogs(ContainerLogsRequest options)
throws IOException {
+ List<ContainerLogMeta> containersLogMeta;
Set<String> logTypes = new HashSet<String>();
- ApplicationId appId = options.getAppId();
- String appOwner = options.getAppOwner();
- String nodeId = options.getNodeId();
- String containerIdStr = options.getContainerId();
- boolean getAllContainers = (containerIdStr == null);
- String nodeIdStr = (nodeId == null) ? null
- : LogAggregationUtils.getNodeString(nodeId);
- RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
- appId, appOwner);
- if (nodeFiles == null) {
+ try {
+ containersLogMeta = getFileController(options.getAppId(),
+ options.getAppOwner()).readAggregatedLogsMeta(
+ options);
+ } catch (Exception ex) {
+ System.err.println(ex.getMessage());
return logTypes;
}
- while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- if (nodeIdStr != null) {
- if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
- continue;
- }
- }
- if (!thisNodeFile.getPath().getName()
- .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader =
- new AggregatedLogFormat.LogReader(getConf(),
- thisNodeFile.getPath());
- try {
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
- while (valueStream != null) {
- if (getAllContainers || (key.toString().equals(containerIdStr))) {
- while (true) {
- try {
- String logFile = LogReader.readContainerMetaDataAndSkipData(
- valueStream).getFirst();
- logTypes.add(logFile);
- } catch (EOFException eof) {
- break;
- }
- }
- if (!getAllContainers) {
- break;
- }
- }
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
- }
- } finally {
- reader.close();
- }
+ for (ContainerLogMeta logMeta: containersLogMeta) {
+ for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
+ logTypes.add(fileInfo.getFileName());
}
}
return logTypes;
}
+
+ private LogAggregationFileController getFileController(ApplicationId appId,
+ String appOwner) throws IOException {
+ if (factory == null) {
+ factory = new LogAggregationFileControllerFactory(conf);
+ }
+ return factory.getFileControllerForRead(appId, appOwner);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
index 74f694e..ddee445 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.logaggregation;
-import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -27,19 +25,7 @@ import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.math3.util.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.HarFs;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
-import org.apache.hadoop.yarn.util.Times;
/**
* This class contains several utility function which could be used in different
@@ -54,81 +40,6 @@ public final class LogToolUtils {
"Container: %s on %s";
/**
- * Return a list of {@link ContainerLogMeta} for a container
- * from Remote FileSystem.
- *
- * @param conf the configuration
- * @param appId the applicationId
- * @param containerIdStr the containerId
- * @param nodeId the nodeId
- * @param appOwner the application owner
- * @return a list of {@link ContainerLogMeta}
- * @throws IOException if there is no available log file
- */
- public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
- Configuration conf, ApplicationId appId, String containerIdStr,
- String nodeId, String appOwner) throws IOException {
- List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
- boolean getAllContainers = (containerIdStr == null);
- String nodeIdStr = (nodeId == null) ? null
- : LogAggregationUtils.getNodeString(nodeId);
- RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
- .getRemoteNodeFileDir(conf, appId, appOwner);
- if (nodeFiles == null) {
- throw new IOException("There is no available log fils for "
- + "application:" + appId);
- }
- while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- if (nodeIdStr != null) {
- if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
- continue;
- }
- }
- if (!thisNodeFile.getPath().getName()
- .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader =
- new AggregatedLogFormat.LogReader(conf,
- thisNodeFile.getPath());
- try {
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
- while (valueStream != null) {
- if (getAllContainers || (key.toString().equals(containerIdStr))) {
- ContainerLogMeta containerLogMeta = new ContainerLogMeta(
- key.toString(), thisNodeFile.getPath().getName());
- while (true) {
- try {
- Pair<String, String> logMeta =
- LogReader.readContainerMetaDataAndSkipData(
- valueStream);
- containerLogMeta.addLogMeta(
- logMeta.getFirst(),
- logMeta.getSecond(),
- Times.format(thisNodeFile.getModificationTime()));
- } catch (EOFException eof) {
- break;
- }
- }
- containersLogMeta.add(containerLogMeta);
- if (!getAllContainers) {
- break;
- }
- }
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
- }
- } finally {
- reader.close();
- }
- }
- }
- return containersLogMeta;
- }
-
- /**
* Output container log.
* @param containerId the containerId
* @param nodeId the nodeId
@@ -247,82 +158,4 @@ public final class LogToolUtils {
}
}
- public static boolean outputAggregatedContainerLog(Configuration conf,
- ApplicationId appId, String appOwner,
- String containerId, String nodeId,
- String logFileName, long outputSize, OutputStream os,
- byte[] buf) throws IOException {
- boolean findLogs = false;
- RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
- .getRemoteNodeFileDir(conf, appId, appOwner);
- while (nodeFiles != null && nodeFiles.hasNext()) {
- final FileStatus thisNodeFile = nodeFiles.next();
- String nodeName = thisNodeFile.getPath().getName();
- if (nodeName.equals(appId + ".har")) {
- Path p = new Path("har:///"
- + thisNodeFile.getPath().toUri().getRawPath());
- nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
- continue;
- }
- if ((nodeId == null || nodeName.contains(LogAggregationUtils
- .getNodeString(nodeId))) && !nodeName.endsWith(
- LogAggregationUtils.TMP_FILE_SUFFIX)) {
- AggregatedLogFormat.LogReader reader = null;
- try {
- reader = new AggregatedLogFormat.LogReader(conf,
- thisNodeFile.getPath());
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
- while (valueStream != null && !key.toString()
- .equals(containerId)) {
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
- }
- if (valueStream == null) {
- continue;
- }
- while (true) {
- try {
- String fileType = valueStream.readUTF();
- String fileLengthStr = valueStream.readUTF();
- long fileLength = Long.parseLong(fileLengthStr);
- if (fileType.equalsIgnoreCase(logFileName)) {
- LogToolUtils.outputContainerLog(containerId,
- nodeId, fileType, fileLength, outputSize,
- Times.format(thisNodeFile.getModificationTime()),
- valueStream, os, buf,
- ContainerLogAggregationType.AGGREGATED);
- StringBuilder sb = new StringBuilder();
- String endOfFile = "End of LogType:" + fileType;
- sb.append("\n" + endOfFile + "\n");
- sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
- + "\n\n");
- byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
- os.write(b, 0, b.length);
- findLogs = true;
- } else {
- long totalSkipped = 0;
- long currSkipped = 0;
- while (currSkipped != -1 && totalSkipped < fileLength) {
- currSkipped = valueStream.skip(
- fileLength - totalSkipped);
- totalSkipped += currSkipped;
- }
- }
- } catch (EOFException eof) {
- break;
- }
- }
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- }
- }
- os.flush();
- return findLogs;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 5503f8f..87344a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -24,6 +24,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,7 +35,9 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -42,13 +48,18 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
/**
* Base class to implement Log Aggregation File Controller.
@@ -167,6 +178,74 @@ public abstract class LogAggregationFileController {
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
+ protected PrintStream createPrintStream(String localDir, String nodeId,
+ String containerId) throws IOException {
+ PrintStream out = System.out;
+ if(localDir != null && !localDir.isEmpty()) {
+ Path nodePath = new Path(localDir, LogAggregationUtils
+ .getNodeString(nodeId));
+ Files.createDirectories(Paths.get(nodePath.toString()));
+ Path containerLogPath = new Path(nodePath, containerId);
+ out = new PrintStream(containerLogPath.toString(), "UTF-8");
+ }
+ return out;
+ }
+
+ protected void closePrintStream(OutputStream out) {
+ if (out != System.out) {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ /**
+ * Output container log.
+ * @param logRequest {@link ContainerLogsRequest}
+ * @param os the output stream
+ * @throws IOException if we can not access the log file.
+ */
+ public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+ OutputStream os) throws IOException;
+
+ /**
+ * Return a list of {@link ContainerLogMeta} for an application
+ * from Remote FileSystem.
+ *
+ * @param logRequest {@link ContainerLogsRequest}
+ * @return a list of {@link ContainerLogMeta}
+ * @throws IOException if there is no available log file
+ */
+ public abstract List<ContainerLogMeta> readAggregatedLogsMeta(
+ ContainerLogsRequest logRequest) throws IOException;
+
+ /**
+ * Render Aggregated Logs block.
+ * @param html the html
+ * @param context the ViewContext
+ */
+ public abstract void renderAggregatedLogsBlock(Block html,
+ ViewContext context);
+
+ /**
+ * Returns the owner of the application.
+ *
+ * @param the aggregatedLog path.
+ * @return the application owner.
+ * @throws IOException
+ */
+ public abstract String getApplicationOwner(Path aggregatedLogPath)
+ throws IOException;
+
+ /**
+ * Returns ACLs for the application. An empty map is returned if no ACLs are
+ * found.
+ *
+ * @param the aggregatedLog path.
+ * @return a map of the Application ACLs.
+ * @throws IOException
+ */
+ public abstract Map<ApplicationAccessType, String> getApplicationAcls(
+ Path aggregatedLogPath) throws IOException;
+
/**
* Verify and create the remote log directory.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
new file mode 100644
index 0000000..c996623
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
+
+import com.google.inject.Inject;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+/**
+ * Base class to implement Aggregated Logs Block.
+ */
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+public abstract class LogAggregationHtmlBlock extends HtmlBlock {
+
+ @Inject
+ public LogAggregationHtmlBlock(ViewContext ctx) {
+ super(ctx);
+ }
+
+ protected BlockParameters verifyAndParseParameters(Block html) {
+ BlockParameters params = new BlockParameters();
+ ContainerId containerId = LogAggregationWebUtils
+ .verifyAndGetContainerId(html, $(CONTAINER_ID));
+ params.setContainerId(containerId);
+
+ NodeId nodeId = LogAggregationWebUtils
+ .verifyAndGetNodeId(html, $(NM_NODENAME));
+ params.setNodeId(nodeId);
+
+ String appOwner = LogAggregationWebUtils
+ .verifyAndGetAppOwner(html, $(APP_OWNER));
+ params.setAppOwner(appOwner);
+
+ boolean isValid = true;
+ long start = -4096;
+ try {
+ start = LogAggregationWebUtils.getLogStartIndex(
+ html, $("start"));
+ } catch (NumberFormatException ne) {
+ html.h1().__("Invalid log start value: " + $("start")).__();
+ isValid = false;
+ }
+ params.setStartIndex(start);
+
+ long end = Long.MAX_VALUE;
+ try {
+ end = LogAggregationWebUtils.getLogEndIndex(
+ html, $("end"));
+ } catch (NumberFormatException ne) {
+ html.h1().__("Invalid log start value: " + $("end")).__();
+ isValid = false;
+ }
+ params.setEndIndex(end);
+
+ if (containerId == null || nodeId == null || appOwner == null
+ || appOwner.isEmpty() || !isValid) {
+ return null;
+ }
+
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ params.setAppId(appId);
+
+ String logEntity = $(ENTITY_STRING);
+ if (logEntity == null || logEntity.isEmpty()) {
+ logEntity = containerId.toString();
+ }
+ params.setLogEntity(logEntity);
+
+ return params;
+ }
+
+ protected boolean checkAcls(Configuration conf, ApplicationId appId,
+ String owner, Map<ApplicationAccessType, String> appAcls,
+ String remoteUser) {
+ ApplicationACLsManager aclsManager = new ApplicationACLsManager(
+ conf);
+ aclsManager.addApplication(appId, appAcls);
+
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
+ ApplicationAccessType.VIEW_APP, owner, appId)) {
+ return false;
+ }
+ return true;
+ }
+
+ protected static class BlockParameters {
+ private ApplicationId appId;
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private String appOwner;
+ private long start;
+ private long end;
+ private String logEntity;
+
+ public ApplicationId getAppId() {
+ return appId;
+ }
+
+ public void setAppId(ApplicationId appId) {
+ this.appId = appId;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public void setContainerId(ContainerId containerId) {
+ this.containerId = containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(NodeId nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getAppOwner() {
+ return appOwner;
+ }
+
+ public void setAppOwner(String appOwner) {
+ this.appOwner = appOwner;
+ }
+
+ public long getStartIndex() {
+ return start;
+ }
+
+ public void setStartIndex(long startIndex) {
+ this.start = startIndex;
+ }
+
+ public long getEndIndex() {
+ return end;
+ }
+
+ public void setEndIndex(long endIndex) {
+ this.end = endIndex;
+ }
+
+ public String getLogEntity() {
+ return logEntity;
+ }
+
+ public void setLogEntity(String logEntity) {
+ this.logEntity = logEntity;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
deleted file mode 100644
index 9e0c66d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.logaggregation.filecontroller;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
-import org.apache.hadoop.yarn.util.Times;
-
-/**
- * The TFile log aggregation file Controller implementation.
- */
-@Private
-@Unstable
-public class LogAggregationTFileController
- extends LogAggregationFileController {
-
- private static final Log LOG = LogFactory.getLog(
- LogAggregationTFileController.class);
-
- private LogWriter writer;
-
- public LogAggregationTFileController(){}
-
- @Override
- public void initInternal(Configuration conf) {
- this.remoteRootLogDir = new Path(
- conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
- this.remoteRootLogDirSuffix =
- conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
- }
-
- @Override
- public void initializeWriter(LogAggregationFileControllerContext context)
- throws IOException {
- this.writer = new LogWriter();
- writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
- context.getUserUgi());
- // Write ACLs once when the writer is created.
- writer.writeApplicationACLs(context.getAppAcls());
- writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
- }
-
- @Override
- public void closeWriter() {
- this.writer.close();
- }
-
- @Override
- public void write(LogKey logKey, LogValue logValue) throws IOException {
- this.writer.append(logKey, logValue);
- }
-
- @Override
- public void postWrite(final LogAggregationFileControllerContext record)
- throws Exception {
- // Before upload logs, make sure the number of existing logs
- // is smaller than the configured NM log aggregation retention size.
- if (record.isUploadedLogsInThisCycle() &&
- record.isLogAggregationInRolling()) {
- cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
- record.getUserUgi());
- record.increcleanupOldLogTimes();
- }
-
- final Path renamedPath = record.getRollingMonitorInterval() <= 0
- ? record.getRemoteNodeLogFileForApp() : new Path(
- record.getRemoteNodeLogFileForApp().getParent(),
- record.getRemoteNodeLogFileForApp().getName() + "_"
- + record.getLogUploadTimeStamp());
- final boolean rename = record.isUploadedLogsInThisCycle();
- try {
- record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
- .getFileSystem(conf);
- if (rename) {
- remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
- renamedPath);
- } else {
- remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
- }
- return null;
- }
- });
- } catch (Exception e) {
- LOG.error(
- "Failed to move temporary log file to final location: ["
- + record.getRemoteNodeTmpLogFileForApp() + "] to ["
- + renamedPath + "]", e);
- throw new Exception("Log uploaded failed for Application: "
- + record.getAppId() + " in NodeManager: "
- + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
- + Times.format(record.getLogUploadTimeStamp()) + "\n");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
new file mode 100644
index 0000000..d2038e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
+
+/**
+ * The TFile log aggregation file Controller implementation.
+ */
+@Private
+@Unstable
+public class LogAggregationTFileController
+ extends LogAggregationFileController {
+
+ private static final Log LOG = LogFactory.getLog(
+ LogAggregationTFileController.class);
+
+ private LogWriter writer;
+ private TFileLogReader tfReader = null;
+
+ public LogAggregationTFileController(){}
+
+ @Override
+ public void initInternal(Configuration conf) {
+ this.remoteRootLogDir = new Path(
+ conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ this.remoteRootLogDirSuffix =
+ conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ }
+
+ @Override
+ public void initializeWriter(LogAggregationFileControllerContext context)
+ throws IOException {
+ this.writer = new LogWriter();
+ writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
+ context.getUserUgi());
+ // Write ACLs once when the writer is created.
+ writer.writeApplicationACLs(context.getAppAcls());
+ writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
+ }
+
+ @Override
+ public void closeWriter() {
+ this.writer.close();
+ this.writer = null;
+ }
+
+ @Override
+ public void write(LogKey logKey, LogValue logValue) throws IOException {
+ this.writer.append(logKey, logValue);
+ }
+
+ @Override
+ public void postWrite(final LogAggregationFileControllerContext record)
+ throws Exception {
+ // Before upload logs, make sure the number of existing logs
+ // is smaller than the configured NM log aggregation retention size.
+ if (record.isUploadedLogsInThisCycle() &&
+ record.isLogAggregationInRolling()) {
+ cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
+ record.getUserUgi());
+ record.increcleanupOldLogTimes();
+ }
+
+ final Path renamedPath = record.getRollingMonitorInterval() <= 0
+ ? record.getRemoteNodeLogFileForApp() : new Path(
+ record.getRemoteNodeLogFileForApp().getParent(),
+ record.getRemoteNodeLogFileForApp().getName() + "_"
+ + record.getLogUploadTimeStamp());
+ final boolean rename = record.isUploadedLogsInThisCycle();
+ try {
+ record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
+ .getFileSystem(conf);
+ if (rename) {
+ remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
+ renamedPath);
+ } else {
+ remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to move temporary log file to final location: ["
+ + record.getRemoteNodeTmpLogFileForApp() + "] to ["
+ + renamedPath + "]", e);
+ throw new Exception("Log uploaded failed for Application: "
+ + record.getAppId() + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+ + Times.format(record.getLogUploadTimeStamp()) + "\n");
+ }
+ }
+
+ @Override
+ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+ OutputStream os) throws IOException {
+ boolean findLogs = false;
+ boolean createPrintStream = (os == null);
+ ApplicationId appId = logRequest.getAppId();
+ String nodeId = logRequest.getNodeId();
+ List<String> logTypes = new ArrayList<>();
+ if (logRequest.getLogTypes() != null && !logRequest
+ .getLogTypes().isEmpty()) {
+ logTypes.addAll(logRequest.getLogTypes());
+ }
+ String containerIdStr = logRequest.getContainerId();
+ boolean getAllContainers = (containerIdStr == null
+ || containerIdStr.isEmpty());
+ long size = logRequest.getBytes();
+ RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner());
+ byte[] buf = new byte[65535];
+ while (nodeFiles != null && nodeFiles.hasNext()) {
+ final FileStatus thisNodeFile = nodeFiles.next();
+ LOG.error(thisNodeFile.getPath().toString());
+ String nodeName = thisNodeFile.getPath().getName();
+ if (nodeName.equals(appId + ".har")) {
+ Path p = new Path("har:///"
+ + thisNodeFile.getPath().toUri().getRawPath());
+ nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+ continue;
+ }
+ if ((nodeId == null || nodeName.contains(LogAggregationUtils
+ .getNodeString(nodeId))) && !nodeName.endsWith(
+ LogAggregationUtils.TMP_FILE_SUFFIX)) {
+ AggregatedLogFormat.LogReader reader = null;
+ try {
+ reader = new AggregatedLogFormat.LogReader(conf,
+ thisNodeFile.getPath());
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+ while (valueStream != null) {
+ if (getAllContainers || (key.toString().equals(containerIdStr))) {
+ if (createPrintStream) {
+ os = createPrintStream(
+ logRequest.getOutputLocalDir(),
+ thisNodeFile.getPath().getName(), key.toString());
+ }
+ try {
+ while (true) {
+ try {
+ String fileType = valueStream.readUTF();
+ String fileLengthStr = valueStream.readUTF();
+ long fileLength = Long.parseLong(fileLengthStr);
+ if (logTypes == null || logTypes.isEmpty() ||
+ logTypes.contains(fileType)) {
+ LogToolUtils.outputContainerLog(key.toString(),
+ nodeName, fileType, fileLength, size,
+ Times.format(thisNodeFile.getModificationTime()),
+ valueStream, os, buf,
+ ContainerLogAggregationType.AGGREGATED);
+ StringBuilder sb = new StringBuilder();
+ String endOfFile = "End of LogType:" + fileType;
+ sb.append("\n" + endOfFile + "\n");
+ sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ + "\n\n");
+ byte[] b = sb.toString().getBytes(
+ Charset.forName("UTF-8"));
+ os.write(b, 0, b.length);
+ findLogs = true;
+ } else {
+ long totalSkipped = 0;
+ long currSkipped = 0;
+ while (currSkipped != -1 && totalSkipped < fileLength) {
+ currSkipped = valueStream.skip(
+ fileLength - totalSkipped);
+ totalSkipped += currSkipped;
+ }
+ }
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+ } finally {
+ os.flush();
+ if (createPrintStream) {
+ closePrintStream(os);
+ }
+ }
+ if (!getAllContainers) {
+ break;
+ }
+ }
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+ return findLogs;
+ }
+
+ @Override
+ public List<ContainerLogMeta> readAggregatedLogsMeta(
+ ContainerLogsRequest logRequest) throws IOException {
+ List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
+ String containerIdStr = logRequest.getContainerId();
+ String nodeId = logRequest.getNodeId();
+ ApplicationId appId = logRequest.getAppId();
+ String appOwner = logRequest.getAppOwner();
+ boolean getAllContainers = (containerIdStr == null);
+ String nodeIdStr = (nodeId == null) ? null
+ : LogAggregationUtils.getNodeString(nodeId);
+ RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileDir(conf, appId, appOwner);
+ if (nodeFiles == null) {
+ throw new IOException("There is no available log fils for "
+ + "application:" + appId);
+ }
+ while (nodeFiles.hasNext()) {
+ FileStatus thisNodeFile = nodeFiles.next();
+ if (nodeIdStr != null) {
+ if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
+ continue;
+ }
+ }
+ if (!thisNodeFile.getPath().getName()
+ .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(conf,
+ thisNodeFile.getPath());
+ try {
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+ while (valueStream != null) {
+ if (getAllContainers || (key.toString().equals(containerIdStr))) {
+ ContainerLogMeta containerLogMeta = new ContainerLogMeta(
+ key.toString(), thisNodeFile.getPath().getName());
+ while (true) {
+ try {
+ Pair<String, String> logMeta =
+ LogReader.readContainerMetaDataAndSkipData(
+ valueStream);
+ containerLogMeta.addLogMeta(
+ logMeta.getFirst(),
+ logMeta.getSecond(),
+ Times.format(thisNodeFile.getModificationTime()));
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+ containersLogMeta.add(containerLogMeta);
+ if (!getAllContainers) {
+ break;
+ }
+ }
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ }
+ return containersLogMeta;
+ }
+
+ @Override
+ public void renderAggregatedLogsBlock(Block html, ViewContext context) {
+ TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock(
+ context, conf);
+ block.render(html);
+ }
+
+ @Override
+ public String getApplicationOwner(Path aggregatedLog) throws IOException {
+ createTFileLogReader(aggregatedLog);
+ return this.tfReader.getLogReader().getApplicationOwner();
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationAcls(
+ Path aggregatedLog) throws IOException {
+ createTFileLogReader(aggregatedLog);
+ return this.tfReader.getLogReader().getApplicationAcls();
+ }
+
+ private void createTFileLogReader(Path aggregatedLog) throws IOException {
+ if (this.tfReader == null || !this.tfReader.getAggregatedLogPath()
+ .equals(aggregatedLog)) {
+ LogReader logReader = new LogReader(conf, aggregatedLog);
+ this.tfReader = new TFileLogReader(logReader, aggregatedLog);
+ }
+ }
+
+ private static class TFileLogReader {
+ private LogReader logReader;
+ private Path aggregatedLogPath;
+
+ TFileLogReader(LogReader logReader, Path aggregatedLogPath) {
+ this.setLogReader(logReader);
+ this.setAggregatedLogPath(aggregatedLogPath);
+ }
+ public LogReader getLogReader() {
+ return logReader;
+ }
+ public void setLogReader(LogReader logReader) {
+ this.logReader = logReader;
+ }
+ public Path getAggregatedLogPath() {
+ return aggregatedLogPath;
+ }
+ public void setAggregatedLogPath(Path aggregatedLogPath) {
+ this.aggregatedLogPath = aggregatedLogPath;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
new file mode 100644
index 0000000..dde70e3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
+
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
+
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
+
+/**
+ * The Aggregated Logs Block implementation for TFile.
+ */
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
+
+ private final Configuration conf;
+
+ @Inject
+ public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf) {
+ super(ctx);
+ this.conf = conf;
+ }
+
+ @Override
+ protected void render(Block html) {
+
+ BlockParameters params = verifyAndParseParameters(html);
+ if (params == null) {
+ return;
+ }
+
+ RemoteIterator<FileStatus> nodeFiles;
+ try {
+ nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileDir(conf, params.getAppId(),
+ params.getAppOwner());
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception ex) {
+ html.h1("No logs available for container "
+ + params.getContainerId().toString());
+ return;
+ }
+
+ NodeId nodeId = params.getNodeId();
+ String logEntity = params.getLogEntity();
+ ApplicationId appId = params.getAppId();
+ ContainerId containerId = params.getContainerId();
+ long start = params.getStartIndex();
+ long end = params.getEndIndex();
+
+ boolean foundLog = false;
+ String desiredLogType = $(CONTAINER_LOG_TYPE);
+ try {
+ while (nodeFiles.hasNext()) {
+ AggregatedLogFormat.LogReader reader = null;
+ try {
+ FileStatus thisNodeFile = nodeFiles.next();
+ if (thisNodeFile.getPath().getName().equals(
+ params.getAppId() + ".har")) {
+ Path p = new Path("har:///"
+ + thisNodeFile.getPath().toUri().getRawPath());
+ nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+ continue;
+ }
+ if (!thisNodeFile.getPath().getName()
+ .contains(LogAggregationUtils.getNodeString(nodeId))
+ || thisNodeFile.getPath().getName()
+ .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
+ continue;
+ }
+ long logUploadedTime = thisNodeFile.getModificationTime();
+ reader = new AggregatedLogFormat.LogReader(
+ conf, thisNodeFile.getPath());
+
+ String owner = null;
+ Map<ApplicationAccessType, String> appAcls = null;
+ try {
+ owner = reader.getApplicationOwner();
+ appAcls = reader.getApplicationAcls();
+ } catch (IOException e) {
+ LOG.error("Error getting logs for " + logEntity, e);
+ continue;
+ }
+ String remoteUser = request().getRemoteUser();
+
+ if (!checkAcls(conf, appId, owner, appAcls, remoteUser)) {
+ html.h1().__("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity
+ + " in log file [" + thisNodeFile.getPath().getName() + "]")
+ .__();
+ LOG.error("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity);
+ continue;
+ }
+
+ AggregatedLogFormat.ContainerLogsReader logReader = reader
+ .getContainerLogsReader(containerId);
+ if (logReader == null) {
+ continue;
+ }
+
+ foundLog = readContainerLogs(html, logReader, start, end,
+ desiredLogType, logUploadedTime);
+ } catch (IOException ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ continue;
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ if (!foundLog) {
+ if (desiredLogType.isEmpty()) {
+ html.h1("No logs available for container "
+ + containerId.toString());
+ } else {
+ html.h1("Unable to locate '" + desiredLogType
+ + "' log for container " + containerId.toString());
+ }
+ }
+ } catch (IOException e) {
+ html.h1().__("Error getting logs for " + logEntity).__();
+ LOG.error("Error getting logs for " + logEntity, e);
+ }
+ }
+
+ private boolean readContainerLogs(Block html,
+ AggregatedLogFormat.ContainerLogsReader logReader, long startIndex,
+ long endIndex, String desiredLogType, long logUpLoadTime)
+ throws IOException {
+ int bufferSize = 65536;
+ char[] cbuf = new char[bufferSize];
+
+ boolean foundLog = false;
+ String logType = logReader.nextLog();
+ while (logType != null) {
+ if (desiredLogType == null || desiredLogType.isEmpty()
+ || desiredLogType.equals(logType)) {
+ long logLength = logReader.getCurrentLogLength();
+ if (foundLog) {
+ html.pre().__("\n\n").__();
+ }
+
+ html.p().__("Log Type: " + logType).__();
+ html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__();
+ html.p().__("Log Length: " + Long.toString(logLength)).__();
+
+ long start = startIndex < 0
+ ? logLength + startIndex : startIndex;
+ start = start < 0 ? 0 : start;
+ start = start > logLength ? logLength : start;
+ long end = endIndex < 0
+ ? logLength + endIndex : endIndex;
+ end = end < 0 ? 0 : end;
+ end = end > logLength ? logLength : end;
+ end = end < start ? start : end;
+
+ long toRead = end - start;
+ if (toRead < logLength) {
+ html.p().__("Showing " + toRead + " bytes of " + logLength
+ + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
+ $(ENTITY_STRING), $(APP_OWNER),
+ logType, "?start=0"), "here").
+ __(" for the full log.").__();
+ }
+
+ long totalSkipped = 0;
+ while (totalSkipped < start) {
+ long ret = logReader.skip(start - totalSkipped);
+ if (ret == 0) {
+ //Read one byte
+ int nextByte = logReader.read();
+ // Check if we have reached EOF
+ if (nextByte == -1) {
+ throw new IOException("Premature EOF from container log");
+ }
+ ret = 1;
+ }
+ totalSkipped += ret;
+ }
+
+ int len = 0;
+ int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ PRE<Hamlet> pre = html.pre();
+
+ while (toRead > 0
+ && (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
+ pre.__(new String(cbuf, 0, len));
+ toRead = toRead - len;
+ currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ }
+
+ pre.__();
+ foundLog = true;
+ }
+
+ logType = logReader.nextLog();
+ }
+
+ return foundLog;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91cc070d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
new file mode 100644
index 0000000..b2e91ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
+import org.apache.hadoop.classification.InterfaceAudience;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org