You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/09/08 22:13:30 UTC
[1/2] hadoop git commit: YARN-7072. Add a new log aggregation file
format controller. Contributed by Xuan Gong.
Repository: hadoop
Updated Branches:
refs/heads/trunk 8edc60531 -> 3fddabc2f
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
index aaed538..989b326 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -27,7 +27,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.util.Pair;
@@ -192,7 +191,7 @@ public class LogAggregationTFileController
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
if (createPrintStream) {
- os = createPrintStream(
+ os = LogToolUtils.createPrintStream(
logRequest.getOutputLocalDir(),
thisNodeFile.getPath().getName(), key.toString());
}
@@ -209,12 +208,7 @@ public class LogAggregationTFileController
Times.format(thisNodeFile.getModificationTime()),
valueStream, os, buf,
ContainerLogAggregationType.AGGREGATED);
- StringBuilder sb = new StringBuilder();
- String endOfFile = "End of LogType:" + fileType;
- sb.append("\n" + endOfFile + "\n");
- sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
- + "\n\n");
- byte[] b = sb.toString().getBytes(
+ byte[] b = aggregatedLogSuffix(fileType).getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
new file mode 100644
index 0000000..5f61710
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Function test for {@link LogAggregationIndexFileController}.
+ *
+ */
+public class TestLogAggregationIndexFileController {
+
+ private final String rootLocalLogDir = "target/LocalLogs";
+ private final Path rootLocalLogDirPath = new Path(rootLocalLogDir);
+ private final String remoteLogDir = "target/remote-app";
+ private static final FsPermission LOG_FILE_UMASK = FsPermission
+ .createImmutable((short) (0777));
+ private static final UserGroupInformation USER_UGI = UserGroupInformation
+ .createRemoteUser("testUser");
+ private FileSystem fs;
+ private Configuration conf;
+ private ApplicationId appId;
+ private ContainerId containerId;
+ private NodeId nodeId;
+
+ private ByteArrayOutputStream sysOutStream;
+ private PrintStream sysOut;
+
+ private ByteArrayOutputStream sysErrStream;
+ private PrintStream sysErr;
+
+ @Before
+ public void setUp() throws IOException {
+ appId = ApplicationId.newInstance(123456, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
+ appId, 1);
+ containerId = ContainerId.newContainerId(attemptId, 1);
+ nodeId = NodeId.newInstance("localhost", 9999);
+ conf = new Configuration();
+ conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir",
+ remoteLogDir);
+ conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix",
+ "logs");
+ conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz");
+ fs = FileSystem.get(conf);
+ sysOutStream = new ByteArrayOutputStream();
+ sysOut = new PrintStream(sysOutStream);
+ System.setOut(sysOut);
+
+ sysErrStream = new ByteArrayOutputStream();
+ sysErr = new PrintStream(sysErrStream);
+ System.setErr(sysErr);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ fs.delete(rootLocalLogDirPath, true);
+ fs.delete(new Path(remoteLogDir), true);
+ }
+
+ @Test(timeout = 15000)
+ public void testLogAggregationIndexFileFormat() throws Exception {
+ if (fs.exists(rootLocalLogDirPath)) {
+ fs.delete(rootLocalLogDirPath, true);
+ }
+ assertTrue(fs.mkdirs(rootLocalLogDirPath));
+
+ Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString());
+ if (fs.exists(appLogsDir)) {
+ fs.delete(appLogsDir, true);
+ }
+ assertTrue(fs.mkdirs(appLogsDir));
+
+ List<String> logTypes = new ArrayList<String>();
+ logTypes.add("syslog");
+ logTypes.add("stdout");
+ logTypes.add("stderr");
+
+ Set<File> files = new HashSet<>();
+
+ LogKey key1 = new LogKey(containerId.toString());
+
+ for(String logType : logTypes) {
+ File file = createAndWriteLocalLogFile(containerId, appLogsDir,
+ logType);
+ files.add(file);
+ }
+ LogValue value = mock(LogValue.class);
+ when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
+
+ LogAggregationIndexedFileController fileFormat
+ = new LogAggregationIndexedFileController();
+ fileFormat.initialize(conf, "Indexed");
+
+ Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+ Path appDir = fileFormat.getRemoteAppLogDir(appId,
+ USER_UGI.getShortUserName());
+ if (fs.exists(appDir)) {
+ fs.delete(appDir, true);
+ }
+ assertTrue(fs.mkdirs(appDir));
+
+ Path logPath = fileFormat.getRemoteNodeLogFileForApp(
+ appId, USER_UGI.getShortUserName(), nodeId);
+ LogAggregationFileControllerContext context =
+ new LogAggregationFileControllerContext(
+ logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI);
+ // initialize the writer
+ fileFormat.initializeWriter(context);
+
+ fileFormat.write(key1, value);
+ LogAggregationFileControllerContext record = mock(
+ LogAggregationFileControllerContext.class);
+ fileFormat.postWrite(record);
+ fileFormat.closeWriter();
+
+ ContainerLogsRequest logRequest = new ContainerLogsRequest();
+ logRequest.setAppId(appId);
+ logRequest.setNodeId(nodeId.toString());
+ logRequest.setAppOwner(USER_UGI.getShortUserName());
+ logRequest.setContainerId(containerId.toString());
+ logRequest.setBytes(Long.MAX_VALUE);
+ List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
+ logRequest);
+ Assert.assertTrue(meta.size() == 1);
+ List<String> fileNames = new ArrayList<>();
+ for (ContainerLogMeta log : meta) {
+ Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
+ Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
+ Assert.assertTrue(log.getContainerLogMeta().size() == 3);
+ for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
+ fileNames.add(file.getFileName());
+ }
+ }
+ fileNames.removeAll(logTypes);
+ Assert.assertTrue(fileNames.isEmpty());
+
+ boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
+ Assert.assertTrue(foundLogs);
+ for (String logType : logTypes) {
+ Assert.assertTrue(sysOutStream.toString().contains(logMessage(
+ containerId, logType)));
+ }
+ sysOutStream.reset();
+
+ // create a checksum file
+ Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
+ appId, USER_UGI.getShortUserName()),
+ LogAggregationUtils.getNodeString(nodeId)
+ + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
+ FSDataOutputStream fInput = null;
+ try {
+ fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
+ fInput.writeLong(0);
+ } finally {
+ IOUtils.closeQuietly(fInput);
+ }
+ meta = fileFormat.readAggregatedLogsMeta(
+ logRequest);
+ Assert.assertTrue(meta.size() == 0);
+ foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
+ Assert.assertFalse(foundLogs);
+ sysOutStream.reset();
+ fs.delete(checksumFile, false);
+ Assert.assertFalse(fs.exists(checksumFile));
+
+ List<String> newLogTypes = new ArrayList<>(logTypes);
+ files.clear();
+ newLogTypes.add("test1");
+ files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
+ "test1"));
+ newLogTypes.add("test2");
+ files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
+ "test2"));
+ LogValue value2 = mock(LogValue.class);
+ when(value2.getPendingLogFilesToUploadForThisContainer())
+ .thenReturn(files);
+
+ // initialize the writer
+ fileFormat.initializeWriter(context);
+ fileFormat.write(key1, value2);
+ fileFormat.closeWriter();
+
+ // We did not call postWriter which we would keep the checksum file.
+ // We can only get the logs/logmeta from the first write.
+ fileFormat.readAggregatedLogsMeta(
+ logRequest);
+ Assert.assertEquals(meta.size(), meta.size(), 1);
+ for (ContainerLogMeta log : meta) {
+ Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
+ Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
+ Assert.assertTrue(log.getContainerLogMeta().size() == 3);
+ for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
+ fileNames.add(file.getFileName());
+ }
+ }
+ fileNames.removeAll(logTypes);
+ Assert.assertTrue(fileNames.isEmpty());
+ foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
+ Assert.assertTrue(foundLogs);
+ for (String logType : logTypes) {
+ Assert.assertTrue(sysOutStream.toString().contains(logMessage(
+ containerId, logType)));
+ }
+ Assert.assertFalse(sysOutStream.toString().contains(logMessage(
+ containerId, "test1")));
+ Assert.assertFalse(sysOutStream.toString().contains(logMessage(
+ containerId, "test2")));
+ sysOutStream.reset();
+
+ // Call postWrite and we should get all logs/logmetas for both
+ // first write and second write
+ fileFormat.initializeWriter(context);
+ fileFormat.write(key1, value2);
+ fileFormat.postWrite(record);
+ fileFormat.closeWriter();
+ fileFormat.readAggregatedLogsMeta(
+ logRequest);
+ Assert.assertEquals(meta.size(), meta.size(), 2);
+ for (ContainerLogMeta log : meta) {
+ Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
+ Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
+ for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
+ fileNames.add(file.getFileName());
+ }
+ }
+ fileNames.removeAll(newLogTypes);
+ Assert.assertTrue(fileNames.isEmpty());
+ foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
+ Assert.assertTrue(foundLogs);
+ for (String logType : newLogTypes) {
+ Assert.assertTrue(sysOutStream.toString().contains(logMessage(
+ containerId, logType)));
+ }
+ sysOutStream.reset();
+ }
+
+ private File createAndWriteLocalLogFile(ContainerId containerId,
+ Path localLogDir, String logType) throws IOException {
+ File file = new File(localLogDir.toString(), logType);
+ if (file.exists()) {
+ file.delete();
+ }
+ file.createNewFile();
+ Writer writer = null;
+ try {
+ writer = new FileWriter(file);
+ writer.write(logMessage(containerId, logType));
+ writer.close();
+ return file;
+ } finally {
+ IOUtils.closeQuietly(writer);
+ }
+ }
+
+ private String logMessage(ContainerId containerId, String logType) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Hello " + containerId + " in " + logType + "!");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
index dc692a5..bca5d5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
-import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
@@ -851,7 +851,7 @@ public class TestAHSWebServices extends JerseyTestBase {
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
- List<PerContainerLogFileInfo> logMeta = logInfo
+ List<ContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
@@ -879,7 +879,7 @@ public class TestAHSWebServices extends JerseyTestBase {
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
- List<PerContainerLogFileInfo> logMeta = logInfo
+ List<ContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
@@ -917,7 +917,7 @@ public class TestAHSWebServices extends JerseyTestBase {
assertTrue(responseText.size() == 1);
assertEquals(responseText.get(0).getLogType(),
ContainerLogAggregationType.AGGREGATED.toString());
- List<PerContainerLogFileInfo> logMeta = responseText.get(0)
+ List<ContainerLogFileInfo> logMeta = responseText.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java
index bc3ab39..1bb0408 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java
@@ -27,14 +27,14 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
-import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
/**
* {@code ContainerLogsInfo} includes the log meta-data of containers.
* <p>
* The container log meta-data includes details such as:
* <ul>
- * <li>A list of {@link PerContainerLogFileInfo}.</li>
+ * <li>A list of {@link ContainerLogFileInfo}.</li>
* <li>The container Id.</li>
* <li>The NodeManager Id.</li>
* <li>The logType: could be local or aggregated</li>
@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
public class ContainerLogsInfo {
@XmlElement(name = "containerLogInfo")
- protected List<PerContainerLogFileInfo> containerLogsInfo;
+ protected List<ContainerLogFileInfo> containerLogsInfo;
@XmlElement(name = "logAggregationType")
protected String logType;
@@ -62,14 +62,14 @@ public class ContainerLogsInfo {
public ContainerLogsInfo(ContainerLogMeta logMeta,
ContainerLogAggregationType logType) throws YarnException {
- this.containerLogsInfo = new ArrayList<PerContainerLogFileInfo>(
+ this.containerLogsInfo = new ArrayList<ContainerLogFileInfo>(
logMeta.getContainerLogMeta());
this.logType = logType.toString();
this.containerId = logMeta.getContainerId();
this.nodeId = logMeta.getNodeId();
}
- public List<PerContainerLogFileInfo> getContainerLogsInfo() {
+ public List<ContainerLogFileInfo> getContainerLogsInfo() {
return this.containerLogsInfo;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java
index 5415e04..193ec62 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java
@@ -27,7 +27,7 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
-import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
@@ -55,10 +55,10 @@ public class NMContainerLogsInfo extends ContainerLogsInfo {
containerId, remoteUser, nmContext);
}
- private static List<PerContainerLogFileInfo> getContainerLogsInfo(
+ private static List<ContainerLogFileInfo> getContainerLogsInfo(
ContainerId id, String remoteUser, Context nmContext)
throws YarnException {
- List<PerContainerLogFileInfo> logFiles = new ArrayList<>();
+ List<ContainerLogFileInfo> logFiles = new ArrayList<>();
List<File> logDirs = ContainerLogsUtils.getContainerLogDirs(
id, remoteUser, nmContext);
for (File containerLogsDir : logDirs) {
@@ -66,7 +66,7 @@ public class NMContainerLogsInfo extends ContainerLogsInfo {
if (logs != null) {
for (File log : logs) {
if (log.isFile()) {
- PerContainerLogFileInfo logMeta = new PerContainerLogFileInfo(
+ ContainerLogFileInfo logMeta = new ContainerLogFileInfo(
log.getName(), Long.toString(log.length()),
Times.format(log.lastModified()));
logFiles.add(logMeta);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 1641171..fbab34a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
-import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -537,7 +537,7 @@ public class TestNMWebServices extends JerseyTestBase {
assertTrue(responseList.size() == 1);
assertEquals(responseList.get(0).getLogType(),
ContainerLogAggregationType.LOCAL.toString());
- List<PerContainerLogFileInfo> logMeta = responseList.get(0)
+ List<ContainerLogFileInfo> logMeta = responseList.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), filename);
@@ -564,13 +564,13 @@ public class TestNMWebServices extends JerseyTestBase {
for (ContainerLogsInfo logInfo : responseList) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
- List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
+ List<ContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), aggregatedLogFile);
} else {
assertEquals(logInfo.getLogType(),
ContainerLogAggregationType.LOCAL.toString());
- List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
+ List<ContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), filename);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-7072. Add a new log aggregation file
format controller. Contributed by Xuan Gong.
Posted by ju...@apache.org.
YARN-7072. Add a new log aggregation file format controller. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3fddabc2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3fddabc2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3fddabc2
Branch: refs/heads/trunk
Commit: 3fddabc2fe4fbdb8ef3f9ce7558955c4f0794dcc
Parents: 8edc605
Author: Junping Du <ju...@apache.org>
Authored: Fri Sep 8 15:16:19 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Fri Sep 8 15:16:19 2017 -0700
----------------------------------------------------------------------
.../file/tfile/BoundedRangeFileInputStream.java | 2 +-
.../hadoop/io/file/tfile/Compression.java | 6 +-
.../file/tfile/SimpleBufferedOutputStream.java | 2 +-
.../apache/hadoop/yarn/client/cli/LogsCLI.java | 25 +-
.../logaggregation/ContainerLogFileInfo.java | 93 ++
.../yarn/logaggregation/ContainerLogMeta.java | 8 +-
.../logaggregation/LogAggregationUtils.java | 27 +
.../yarn/logaggregation/LogCLIHelpers.java | 20 +-
.../yarn/logaggregation/LogToolUtils.java | 26 +
.../logaggregation/PerContainerLogFileInfo.java | 93 --
.../LogAggregationFileController.java | 45 +-
.../ifile/IndexedFileAggregatedLogsBlock.java | 275 +++++
.../LogAggregationIndexedFileController.java | 1056 ++++++++++++++++++
.../filecontroller/ifile/package-info.java | 21 +
.../tfile/LogAggregationTFileController.java | 10 +-
.../TestLogAggregationIndexFileController.java | 316 ++++++
.../webapp/TestAHSWebServices.java | 8 +-
.../server/webapp/dao/ContainerLogsInfo.java | 10 +-
.../webapp/dao/NMContainerLogsInfo.java | 8 +-
.../nodemanager/webapp/TestNMWebServices.java | 8 +-
20 files changed, 1886 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
index e7f4c83..050c15b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
* BoundedRangeFileInputStream on top of the same FSDataInputStream and they
* would not interfere with each other.
*/
-class BoundedRangeFileInputStream extends InputStream {
+public class BoundedRangeFileInputStream extends InputStream {
private FSDataInputStream in;
private long pos;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
index f82f4df..fa85ed7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java
@@ -43,7 +43,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
/**
* Compression related stuff.
*/
-final class Compression {
+public final class Compression {
static final Logger LOG = LoggerFactory.getLogger(Compression.class);
/**
@@ -75,7 +75,7 @@ final class Compression {
/**
* Compression algorithms.
*/
- enum Algorithm {
+ public enum Algorithm {
LZO(TFile.COMPRESSION_LZO) {
private transient boolean checked = false;
private static final String defaultClazz =
@@ -348,7 +348,7 @@ final class Compression {
}
}
- static Algorithm getCompressionAlgorithmByName(String compressName) {
+ public static Algorithm getCompressionAlgorithmByName(String compressName) {
Algorithm[] algos = Algorithm.class.getEnumConstants();
for (Algorithm a : algos) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
index a26a02d..0a194a3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java
@@ -25,7 +25,7 @@ import java.io.OutputStream;
* A simplified BufferedOutputStream with borrowed buffer, and allow users to
* see how much data have been buffered.
*/
-class SimpleBufferedOutputStream extends FilterOutputStream {
+public class SimpleBufferedOutputStream extends FilterOutputStream {
protected byte buf[]; // the borrowed buffer
protected int count = 0; // bytes used in buffer.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
index 5528412..1a3db26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
@@ -62,9 +62,10 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
-import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray;
@@ -411,10 +412,10 @@ public class LogsCLI extends Configured implements Tool {
return false;
}
- private List<Pair<PerContainerLogFileInfo, String>> getContainerLogFiles(
+ private List<Pair<ContainerLogFileInfo, String>> getContainerLogFiles(
Configuration conf, String containerIdStr, String nodeHttpAddress)
throws IOException {
- List<Pair<PerContainerLogFileInfo, String>> logFileInfos
+ List<Pair<ContainerLogFileInfo, String>> logFileInfos
= new ArrayList<>();
Client webServiceClient = Client.create();
try {
@@ -453,12 +454,12 @@ public class LogsCLI extends Configured implements Tool {
if (ob instanceof JSONArray) {
JSONArray obArray = (JSONArray)ob;
for (int j = 0; j < obArray.length(); j++) {
- logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
+ logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
obArray.getJSONObject(j)), aggregateType));
}
} else if (ob instanceof JSONObject) {
- logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
+ logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
(JSONObject)ob), aggregateType));
}
@@ -477,7 +478,7 @@ public class LogsCLI extends Configured implements Tool {
return logFileInfos;
}
- private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
+ private ContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
JSONObject meta) throws JSONException {
String fileName = meta.has("fileName") ?
meta.getString("fileName") : "N/A";
@@ -485,7 +486,7 @@ public class LogsCLI extends Configured implements Tool {
meta.getString("fileSize") : "N/A";
String lastModificationTime = meta.has("lastModifiedTime") ?
meta.getString("lastModifiedTime") : "N/A";
- return new PerContainerLogFileInfo(fileName, fileSize,
+ return new ContainerLogFileInfo(fileName, fileSize,
lastModificationTime);
}
@@ -506,7 +507,7 @@ public class LogsCLI extends Configured implements Tool {
return -1;
}
String nodeId = request.getNodeId();
- PrintStream out = logCliHelper.createPrintStream(localDir, nodeId,
+ PrintStream out = LogToolUtils.createPrintStream(localDir, nodeId,
containerIdStr);
try {
Set<String> matchedFiles = getMatchedContainerLogFiles(request,
@@ -1235,9 +1236,9 @@ public class LogsCLI extends Configured implements Tool {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
"LogFile", "LogLength", "LastModificationTime", "LogAggregationType");
outStream.println(StringUtils.repeat("=", containerString.length() * 2));
- List<Pair<PerContainerLogFileInfo, String>> infos = getContainerLogFiles(
+ List<Pair<ContainerLogFileInfo, String>> infos = getContainerLogFiles(
getConf(), containerId, nodeHttpAddress);
- for (Pair<PerContainerLogFileInfo, String> info : infos) {
+ for (Pair<ContainerLogFileInfo, String> info : infos) {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
info.getKey().getFileName(), info.getKey().getFileSize(),
info.getKey().getLastModifiedTime(), info.getValue());
@@ -1249,11 +1250,11 @@ public class LogsCLI extends Configured implements Tool {
boolean useRegex) throws IOException {
// fetch all the log files for the container
// filter the log files based on the given -log_files pattern
- List<Pair<PerContainerLogFileInfo, String>> allLogFileInfos=
+ List<Pair<ContainerLogFileInfo, String>> allLogFileInfos=
getContainerLogFiles(getConf(), request.getContainerId(),
request.getNodeHttpAddress());
List<String> fileNames = new ArrayList<String>();
- for (Pair<PerContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
+ for (Pair<ContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getKey().getFileName());
}
return getMatchedLogFiles(request, fileNames,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java
new file mode 100644
index 0000000..b461ebb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+/**
+ * ContainerLogFileInfo represents the meta data for a container log file,
+ * which includes:
+ * <ul>
+ * <li>The filename of the container log.</li>
+ * <li>The size of the container log.</li>
+ * <li>The last modification time of the container log.</li>
+ * </ul>
+ *
+ */
+public class ContainerLogFileInfo {
+ private String fileName;
+ private String fileSize;
+ private String lastModifiedTime;
+
+ //JAXB needs this
+ public ContainerLogFileInfo() {}
+
+ public ContainerLogFileInfo(String fileName, String fileSize,
+ String lastModifiedTime) {
+ this.setFileName(fileName);
+ this.setFileSize(fileSize);
+ this.setLastModifiedTime(lastModifiedTime);
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public String getFileSize() {
+ return fileSize;
+ }
+
+ public void setFileSize(String fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public String getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ public void setLastModifiedTime(String lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
+ result = prime * result + ((fileSize == null) ? 0 : fileSize.hashCode());
+ result = prime * result + ((lastModifiedTime == null) ?
+ 0 : lastModifiedTime.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object otherObj) {
+ if (otherObj == this) {
+ return true;
+ }
+ if (!(otherObj instanceof ContainerLogFileInfo)) {
+ return false;
+ }
+ ContainerLogFileInfo other = (ContainerLogFileInfo)otherObj;
+ return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
+ && other.lastModifiedTime.equals(lastModifiedTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
index 26a620e..4c6b0de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java
@@ -26,14 +26,14 @@ import java.util.List;
* <ul>
* <li>The Container Id.</li>
* <li>The NodeManager Id.</li>
- * <li>A list of {@link PerContainerLogFileInfo}.</li>
+ * <li>A list of {@link ContainerLogFileInfo}.</li>
* </ul>
*
*/
public class ContainerLogMeta {
private String containerId;
private String nodeId;
- private List<PerContainerLogFileInfo> logMeta;
+ private List<ContainerLogFileInfo> logMeta;
public ContainerLogMeta(String containerId, String nodeId) {
this.containerId = containerId;
@@ -51,11 +51,11 @@ public class ContainerLogMeta {
public void addLogMeta(String fileName, String fileSize,
String lastModificationTime) {
- logMeta.add(new PerContainerLogFileInfo(fileName, fileSize,
+ logMeta.add(new ContainerLogFileInfo(fileName, fileSize,
lastModificationTime));
}
- public List<PerContainerLogFileInfo> getContainerLogMeta() {
+ public List<ContainerLogFileInfo> getContainerLogMeta() {
return this.logMeta;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 6d04c29..edf2cf3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
@Private
public class LogAggregationUtils {
@@ -200,6 +203,30 @@ public class LogAggregationUtils {
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
+ * @param remoteRootLogDir the remote root log directory
+ * @param suffix the log directory suffix
+ * @return the list of available log files
+ * @throws IOException if there is no log file available
+ */
+ public static List<FileStatus> getRemoteNodeFileList(
+ Configuration conf, ApplicationId appId, String appOwner,
+ org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+ throws IOException {
+ Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
+ remoteRootLogDir, suffix);
+ List<FileStatus> nodeFiles = new ArrayList<>();
+ Path qualifiedLogDir =
+ FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
+ nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
+ qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
+ return nodeFiles;
+ }
+
+ /**
+ * Get all available log files under remote app log directory.
+ * @param conf the configuration
+ * @param appId the applicationId
+ * @param appOwner the application owner
* @return the iterator of available log files
* @throws IOException if there is no log file available
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 0068eae..97b78ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -22,8 +22,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -229,7 +227,7 @@ public class LogCLIHelpers implements Configurable {
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
"LastModificationTime", "LogAggregationType");
out.println(StringUtils.repeat("=", containerString.length() * 2));
- for (PerContainerLogFileInfo logMeta : containerLogMeta
+ for (ContainerLogFileInfo logMeta : containerLogMeta
.getContainerLogMeta()) {
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
logMeta.getFileSize(), logMeta.getLastModifiedTime(), "AGGREGATED");
@@ -345,20 +343,6 @@ public class LogCLIHelpers implements Configurable {
+ ". Error message found: " + errorMessage);
}
- @Private
- public PrintStream createPrintStream(String localDir, String nodeId,
- String containerId) throws IOException {
- PrintStream out = System.out;
- if(localDir != null && !localDir.isEmpty()) {
- Path nodePath = new Path(localDir, LogAggregationUtils
- .getNodeString(nodeId));
- Files.createDirectories(Paths.get(nodePath.toString()));
- Path containerLogPath = new Path(nodePath, containerId);
- out = new PrintStream(containerLogPath.toString(), "UTF-8");
- }
- return out;
- }
-
public void closePrintStream(PrintStream out) {
if (out != System.out) {
IOUtils.closeQuietly(out);
@@ -379,7 +363,7 @@ public class LogCLIHelpers implements Configurable {
return logTypes;
}
for (ContainerLogMeta logMeta: containersLogMeta) {
- for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
+ for (ContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
logTypes.add(fileInfo.getFileName());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
index ddee445..90faa19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
@@ -21,11 +21,15 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.PrintStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
/**
* This class contains several utility function which could be used in different
@@ -158,4 +162,26 @@ public final class LogToolUtils {
}
}
+
+ /**
+ * Create the container log file under given (local directory/nodeId) and
+ * return the PrintStream object.
+ * @param localDir the Local Dir
+ * @param nodeId the NodeId
+ * @param containerId the ContainerId
+ * @return the printStream object
+ * @throws IOException if an I/O error occurs
+ */
+ public static PrintStream createPrintStream(String localDir, String nodeId,
+ String containerId) throws IOException {
+ PrintStream out = System.out;
+ if(localDir != null && !localDir.isEmpty()) {
+ Path nodePath = new Path(localDir, LogAggregationUtils
+ .getNodeString(nodeId));
+ Files.createDirectories(Paths.get(nodePath.toString()));
+ Path containerLogPath = new Path(nodePath, containerId);
+ out = new PrintStream(containerLogPath.toString(), "UTF-8");
+ }
+ return out;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java
deleted file mode 100644
index 867815f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.logaggregation;
-
-/**
- * PerContainerLogFileInfo represents the meta data for a container log file,
- * which includes:
- * <ul>
- * <li>The filename of the container log.</li>
- * <li>The size of the container log.</li>
- * <li>The last modification time of the container log.</li>
- * </ul>
- *
- */
-public class PerContainerLogFileInfo {
- private String fileName;
- private String fileSize;
- private String lastModifiedTime;
-
- //JAXB needs this
- public PerContainerLogFileInfo() {}
-
- public PerContainerLogFileInfo(String fileName, String fileSize,
- String lastModifiedTime) {
- this.setFileName(fileName);
- this.setFileSize(fileSize);
- this.setLastModifiedTime(lastModifiedTime);
- }
-
- public String getFileName() {
- return fileName;
- }
-
- public void setFileName(String fileName) {
- this.fileName = fileName;
- }
-
- public String getFileSize() {
- return fileSize;
- }
-
- public void setFileSize(String fileSize) {
- this.fileSize = fileSize;
- }
-
- public String getLastModifiedTime() {
- return lastModifiedTime;
- }
-
- public void setLastModifiedTime(String lastModifiedTime) {
- this.lastModifiedTime = lastModifiedTime;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
- result = prime * result + ((fileSize == null) ? 0 : fileSize.hashCode());
- result = prime * result + ((lastModifiedTime == null) ?
- 0 : lastModifiedTime.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object otherObj) {
- if (otherObj == this) {
- return true;
- }
- if (!(otherObj instanceof PerContainerLogFileInfo)) {
- return false;
- }
- PerContainerLogFileInfo other = (PerContainerLogFileInfo)otherObj;
- return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
- && other.lastModifiedTime.equals(lastModifiedTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 39f3dc3..5df900b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -25,9 +25,6 @@ import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.PrintStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -91,6 +89,12 @@ public abstract class LogAggregationFileController {
protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
+ /**
+ * Umask for the log file.
+ */
+ protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission
+ .createImmutable((short) (0640 ^ 0777));
+
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
@@ -98,6 +102,11 @@ public abstract class LogAggregationFileController {
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
+ // This is temporary solution. The configuration will be deleted once we have
+ // the FileSystem API to check whether append operation is supported or not.
+ public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND
+ = YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append";
+
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
@@ -178,19 +187,6 @@ public abstract class LogAggregationFileController {
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
- protected PrintStream createPrintStream(String localDir, String nodeId,
- String containerId) throws IOException {
- PrintStream out = System.out;
- if(localDir != null && !localDir.isEmpty()) {
- Path nodePath = new Path(localDir, LogAggregationUtils
- .getNodeString(nodeId));
- Files.createDirectories(Paths.get(nodePath.toString()));
- Path containerLogPath = new Path(nodePath, containerId);
- out = new PrintStream(containerLogPath.toString(), "UTF-8");
- }
- return out;
- }
-
protected void closePrintStream(OutputStream out) {
if (out != System.out) {
IOUtils.cleanupWithLogger(LOG, out);
@@ -481,4 +477,21 @@ public abstract class LogAggregationFileController {
LOG.error("Failed to clean old logs", e);
}
}
+
+ /**
+ * Create the aggregated log suffix. The LogAggregationFileController
+ * should call this to get the suffix and append the suffix to the end
+ * of each log. This would keep the aggregated log format consistent.
+ *
+ * @param fileName the File Name
+ * @return the aggregated log suffix String
+ */
+ protected String aggregatedLogSuffix(String fileName) {
+ StringBuilder sb = new StringBuilder();
+ String endOfFile = "End of LogType:" + fileName;
+ sb.append("\n" + endOfFile + "\n");
+ sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ + "\n\n");
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
new file mode 100644
index 0000000..c4cbfda
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
+
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
+
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
+import org.apache.hadoop.io.file.tfile.Compression;
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedFileLogMeta;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedLogsMeta;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
+
+/**
+ * The Aggregated Logs Block implementation for Indexed File.
+ */
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
+
+ private final LogAggregationIndexedFileController fileController;
+ private final Configuration conf;
+
+ @Inject
+ public IndexedFileAggregatedLogsBlock(ViewContext ctx,
+ Configuration conf,
+ LogAggregationIndexedFileController fileController) {
+ super(ctx);
+ this.conf = conf;
+ this.fileController = fileController;
+ }
+
+ @Override
+ protected void render(Block html) {
+ BlockParameters params = verifyAndParseParameters(html);
+ if (params == null) {
+ return;
+ }
+
+ ApplicationId appId = params.getAppId();
+ ContainerId containerId = params.getContainerId();
+ NodeId nodeId = params.getNodeId();
+ String appOwner = params.getAppOwner();
+ String logEntity = params.getLogEntity();
+ long start = params.getStartIndex();
+ long end = params.getEndIndex();
+
+ List<FileStatus> nodeFiles = null;
+ try {
+ nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, appOwner,
+ this.fileController.getRemoteRootLogDir(),
+ this.fileController.getRemoteRootLogDirSuffix());
+ } catch(Exception ex) {
+ html.h1("Unable to locate any logs for container "
+ + containerId.toString());
+ LOG.error(ex.getMessage());
+ return;
+ }
+
+ Map<String, FileStatus> checkSumFiles;
+ try {
+ checkSumFiles = fileController.filterFiles(nodeFiles,
+ LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
+ } catch (IOException ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ html.h1("Error getting logs for " + logEntity);
+ return;
+ }
+
+ List<FileStatus> fileToRead;
+ try {
+ fileToRead = fileController.getNodeLogFileToRead(nodeFiles,
+ nodeId.toString(), appId);
+ } catch (IOException ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ html.h1("Error getting logs for " + logEntity);
+ return;
+ }
+
+ boolean foundLog = false;
+ String desiredLogType = $(CONTAINER_LOG_TYPE);
+ try {
+ for (FileStatus thisNodeFile : fileToRead) {
+ FileStatus checkSum = fileController.getAllChecksumFiles(
+ checkSumFiles, thisNodeFile.getPath().getName());
+ long endIndex = -1;
+ if (checkSum != null) {
+ endIndex = fileController.loadIndexedLogsCheckSum(
+ checkSum.getPath());
+ }
+ IndexedLogsMeta indexedLogsMeta = null;
+ try {
+ indexedLogsMeta = fileController.loadIndexedLogsMeta(
+ thisNodeFile.getPath(), endIndex);
+ } catch (Exception ex) {
+ // DO NOTHING
+ LOG.warn("Can not load log meta from the log file:"
+ + thisNodeFile.getPath());
+ continue;
+ }
+ if (indexedLogsMeta == null) {
+ continue;
+ }
+ Map<ApplicationAccessType, String> appAcls = indexedLogsMeta.getAcls();
+ String user = indexedLogsMeta.getUser();
+ String remoteUser = request().getRemoteUser();
+ if (!checkAcls(conf, appId, user, appAcls, remoteUser)) {
+ html.h1().__("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity
+ + " in log file [" + thisNodeFile.getPath().getName() + "]")
+ .__();
+ LOG.error("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity);
+ continue;
+ }
+ String compressAlgo = indexedLogsMeta.getCompressName();
+ List<IndexedFileLogMeta> candidates = new ArrayList<>();
+ for (IndexedPerAggregationLogMeta logMeta
+ : indexedLogsMeta.getLogMetas()) {
+ for (Entry<String, List<IndexedFileLogMeta>> meta
+ : logMeta.getLogMetas().entrySet()) {
+ for (IndexedFileLogMeta log : meta.getValue()) {
+ if (!log.getContainerId().equals(containerId.toString())) {
+ continue;
+ }
+ if (desiredLogType != null && !desiredLogType.isEmpty()
+ && !desiredLogType.equals(log.getFileName())) {
+ continue;
+ }
+ candidates.add(log);
+ }
+ }
+ }
+ if (candidates.isEmpty()) {
+ continue;
+ }
+
+ Algorithm compressName = Compression.getCompressionAlgorithmByName(
+ compressAlgo);
+ Decompressor decompressor = compressName.getDecompressor();
+ FileContext fileContext = FileContext.getFileContext(
+ thisNodeFile.getPath().toUri(), conf);
+ FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
+ int bufferSize = 65536;
+ for (IndexedFileLogMeta candidate : candidates) {
+ byte[] cbuf = new byte[bufferSize];
+ InputStream in = null;
+ try {
+ in = compressName.createDecompressionStream(
+ new BoundedRangeFileInputStream(fsin,
+ candidate.getStartIndex(),
+ candidate.getFileCompressedSize()),
+ decompressor,
+ LogAggregationIndexedFileController.getFSInputBufferSize(
+ conf));
+ long logLength = candidate.getFileSize();
+ html.pre().__("\n\n").__();
+ html.p().__("Log Type: " + candidate.getFileName()).__();
+ html.p().__("Log Upload Time: " + Times.format(
+ candidate.getLastModificatedTime())).__();
+ html.p().__("Log Length: " + Long.toString(
+ logLength)).__();
+ long startIndex = start < 0
+ ? logLength + start : start;
+ startIndex = startIndex < 0 ? 0 : startIndex;
+ startIndex = startIndex > logLength ? logLength : startIndex;
+ long endLogIndex = end < 0
+ ? logLength + end : end;
+ endLogIndex = endLogIndex < 0 ? 0 : endLogIndex;
+ endLogIndex = endLogIndex > logLength ? logLength : endLogIndex;
+ endLogIndex = endLogIndex < startIndex ?
+ startIndex : endLogIndex;
+ long toRead = endLogIndex - startIndex;
+ if (toRead < logLength) {
+ html.p().__("Showing " + toRead + " bytes of " + logLength
+ + " total. Click ").a(url("logs", $(NM_NODENAME),
+ $(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
+ candidate.getFileName(), "?start=0"), "here").
+ __(" for the full log.").__();
+ }
+ long totalSkipped = 0;
+ while (totalSkipped < start) {
+ long ret = in.skip(start - totalSkipped);
+ if (ret == 0) {
+ //Read one byte
+ int nextByte = in.read();
+ // Check if we have reached EOF
+ if (nextByte == -1) {
+ throw new IOException("Premature EOF from container log");
+ }
+ ret = 1;
+ }
+ totalSkipped += ret;
+ }
+ int len = 0;
+ int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ PRE<Hamlet> pre = html.pre();
+
+ while (toRead > 0
+ && (len = in.read(cbuf, 0, currentToRead)) > 0) {
+ pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
+ toRead = toRead - len;
+ currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+ }
+
+ pre.__();
+ foundLog = true;
+ } catch (Exception ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ continue;
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ }
+ }
+ if (!foundLog) {
+ if (desiredLogType.isEmpty()) {
+ html.h1("No logs available for container " + containerId.toString());
+ } else {
+ html.h1("Unable to locate '" + desiredLogType
+ + "' log for container " + containerId.toString());
+ }
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception ex) {
+ html.h1().__("Error getting logs for " + logEntity).__();
+ LOG.error("Error getting logs for " + logEntity, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
new file mode 100644
index 0000000..6cb2062
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -0,0 +1,1056 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
+import org.apache.hadoop.io.file.tfile.Compression;
+import org.apache.hadoop.io.file.tfile.SimpleBufferedOutputStream;
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Indexed Log Aggregation File Format implementation.
+ *
+ */
+@Private
+@Unstable
+public class LogAggregationIndexedFileController
+ extends LogAggregationFileController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ LogAggregationIndexedFileController.class);
+ private static final String FS_OUTPUT_BUF_SIZE_ATTR =
+ "indexedFile.fs.output.buffer.size";
+ private static final String FS_INPUT_BUF_SIZE_ATTR =
+ "indexedFile.fs.input.buffer.size";
+ private static final String FS_NUM_RETRIES_ATTR =
+ "indexedFile.fs.op.num-retries";
+ private static final String FS_RETRY_INTERVAL_MS_ATTR =
+ "indexedFile.fs.retry-interval-ms";
+ private static final int UUID_LENGTH = 36;
+
+ @VisibleForTesting
+ public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
+
+ private int fsNumRetries = 3;
+ private long fsRetryInterval = 1000L;
+ private static final int VERSION = 1;
+ private IndexedLogsMeta indexedLogsMeta = null;
+ private IndexedPerAggregationLogMeta logsMetaInThisCycle;
+ private long logAggregationTimeInThisCycle;
+ private FSDataOutputStream fsDataOStream;
+ private Algorithm compressAlgo;
+ private CachedIndexedLogsMeta cachedIndexedLogsMeta = null;
+ private boolean logAggregationSuccessfullyInThisCyCle = false;
+ private long currentOffSet = 0;
+ private Path remoteLogCheckSumFile;
+ private FileContext fc;
+ private UserGroupInformation ugi;
+ private String uuid = null;
+
+ public LogAggregationIndexedFileController() {}
+
+ @Override
+ public void initInternal(Configuration conf) {
+ // Currently, we need the underlying File System to support append
+ // operation. Will remove this check after we finish
+ // LogAggregationIndexedFileController for non-append mode.
+ boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true);
+ if (!append) {
+ throw new YarnRuntimeException("The configuration:"
+ + LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only"
+ + " use LogAggregationIndexedFileController when the FileSystem "
+ + "support append operations.");
+ }
+ String remoteDirStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ this.fileControllerName);
+ String remoteDir = conf.get(remoteDirStr);
+ if (remoteDir == null || remoteDir.isEmpty()) {
+ remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
+ }
+ this.remoteRootLogDir = new Path(remoteDir);
+ String suffix = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ this.fileControllerName);
+ this.remoteRootLogDirSuffix = conf.get(suffix);
+ if (this.remoteRootLogDirSuffix == null
+ || this.remoteRootLogDirSuffix.isEmpty()) {
+ this.remoteRootLogDirSuffix = conf.get(
+ YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
+ + "-ifile";
+ }
+ String compressName = conf.get(
+ YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
+ this.compressAlgo = Compression.getCompressionAlgorithmByName(
+ compressName);
+ this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3);
+ this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L);
+ }
+
+ @Override
+ public void initializeWriter(
+ final LogAggregationFileControllerContext context)
+ throws IOException {
+ final UserGroupInformation userUgi = context.getUserUgi();
+ final Map<ApplicationAccessType, String> appAcls = context.getAppAcls();
+ final String nodeId = context.getNodeId().toString();
+ final Path remoteLogFile = context.getRemoteNodeLogFileForApp();
+ this.ugi = userUgi;
+ logAggregationSuccessfullyInThisCyCle = false;
+ logsMetaInThisCycle = new IndexedPerAggregationLogMeta();
+ logAggregationTimeInThisCycle = System.currentTimeMillis();
+ logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle);
+ logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName());
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ fc = FileContext.getFileContext(
+ remoteRootLogDir.toUri(), conf);
+ fc.setUMask(APP_LOG_FILE_UMASK);
+ boolean fileExist = fc.util().exists(remoteLogFile);
+ if (fileExist && context.isLogAggregationInRolling()) {
+ fsDataOStream = fc.create(remoteLogFile,
+ EnumSet.of(CreateFlag.APPEND),
+ new Options.CreateOpts[] {});
+ if (uuid == null) {
+ FSDataInputStream fsDataInputStream = null;
+ try {
+ fsDataInputStream = fc.open(remoteLogFile);
+ byte[] b = new byte[UUID_LENGTH];
+ int actual = fsDataInputStream.read(b);
+ if (actual != UUID_LENGTH) {
+ // Get an error when parse the UUID from existed log file.
+ // Simply OverWrite the existed log file and re-create the
+ // UUID.
+ fsDataOStream = fc.create(remoteLogFile,
+ EnumSet.of(CreateFlag.OVERWRITE),
+ new Options.CreateOpts[] {});
+ uuid = UUID.randomUUID().toString();
+ fsDataOStream.write(uuid.getBytes(Charset.forName("UTF-8")));
+ fsDataOStream.flush();
+ } else {
+ uuid = new String(b, Charset.forName("UTF-8"));
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDataInputStream);
+ }
+ }
+ // if the remote log file exists, but we do not have any
+ // indexedLogsMeta. We need to re-load indexedLogsMeta from
+ // the existing remote log file. If the re-load fails, we simply
+ // re-create a new indexedLogsMeta object. And will re-load
+ // the indexedLogsMeta from checksum file later.
+ if (indexedLogsMeta == null) {
+ try {
+ indexedLogsMeta = loadIndexedLogsMeta(remoteLogFile);
+ } catch (IOException ex) {
+ // DO NOTHING
+ }
+ }
+ } else {
+ fsDataOStream = fc.create(remoteLogFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ new Options.CreateOpts[] {});
+ if (uuid == null) {
+ uuid = UUID.randomUUID().toString();
+ }
+ byte[] b = uuid.getBytes(Charset.forName("UTF-8"));
+ fsDataOStream.write(b);
+ fsDataOStream.flush();
+ }
+ if (indexedLogsMeta == null) {
+ indexedLogsMeta = new IndexedLogsMeta();
+ indexedLogsMeta.setVersion(VERSION);
+ indexedLogsMeta.setUser(userUgi.getShortUserName());
+ indexedLogsMeta.setAcls(appAcls);
+ indexedLogsMeta.setNodeId(nodeId);
+ String compressName = conf.get(
+ YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
+ indexedLogsMeta.setCompressName(compressName);
+ }
+ final long currentAggregatedLogFileLength = fc
+ .getFileStatus(remoteLogFile).getLen();
+ // only check the check-sum file when we are in append mode
+ if (context.isLogAggregationInRolling()) {
+ // check whether the checksum file exists to figure out
+ // whether the previous log aggregation process is successful
+ // and the aggregated log file is corrupted or not.
+ remoteLogCheckSumFile = new Path(remoteLogFile.getParent(),
+ (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX));
+ boolean exist = fc.util().exists(remoteLogCheckSumFile);
+ if (!exist) {
+ FSDataOutputStream checksumFileOutputStream = null;
+ try {
+ checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ new Options.CreateOpts[] {});
+ checksumFileOutputStream.writeLong(
+ currentAggregatedLogFileLength);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
+ }
+ } else {
+ FSDataInputStream checksumFileInputStream = null;
+ try {
+ checksumFileInputStream = fc.open(remoteLogCheckSumFile);
+ long endIndex = checksumFileInputStream.readLong();
+ IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta(
+ remoteLogFile, endIndex);
+ if (recoveredLogsMeta == null) {
+ indexedLogsMeta.getLogMetas().clear();
+ } else {
+ indexedLogsMeta = recoveredLogsMeta;
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
+ }
+ }
+ }
+ // append a simple character("\n") to move the writer cursor, so
+ // we could get the correct position when we call
+ // fsOutputStream.getStartPos()
+ final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
+ fsDataOStream.write(dummyBytes);
+ fsDataOStream.flush();
+
+ if (fsDataOStream.getPos() >= (currentAggregatedLogFileLength
+ + dummyBytes.length)) {
+ currentOffSet = 0;
+ } else {
+ currentOffSet = currentAggregatedLogFileLength;
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void closeWriter() {
+ IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
+ }
+
+ @Override
+ public void write(LogKey logKey, LogValue logValue) throws IOException {
+ String containerId = logKey.toString();
+ Set<File> pendingUploadFiles = logValue
+ .getPendingLogFilesToUploadForThisContainer();
+ List<IndexedFileLogMeta> metas = new ArrayList<>();
+ for (File logFile : pendingUploadFiles) {
+ FileInputStream in = null;
+ try {
+ in = SecureIOUtils.openForRead(logFile, logValue.getUser(), null);
+ } catch (IOException e) {
+ logErrorMessage(logFile, e);
+ IOUtils.cleanupWithLogger(LOG, in);
+ continue;
+ }
+ final long fileLength = logFile.length();
+ IndexedFileOutputStreamState outputStreamState = null;
+ try {
+ outputStreamState = new IndexedFileOutputStreamState(
+ this.compressAlgo, this.fsDataOStream, conf, this.currentOffSet);
+ byte[] buf = new byte[65535];
+ int len = 0;
+ long bytesLeft = fileLength;
+ while ((len = in.read(buf)) != -1) {
+ //If buffer contents within fileLength, write
+ if (len < bytesLeft) {
+ outputStreamState.getOutputStream().write(buf, 0, len);
+ bytesLeft-=len;
+ } else {
+ //else only write contents within fileLength, then exit early
+ outputStreamState.getOutputStream().write(buf, 0,
+ (int)bytesLeft);
+ break;
+ }
+ }
+ long newLength = logFile.length();
+ if(fileLength < newLength) {
+ LOG.warn("Aggregated logs truncated by approximately "+
+ (newLength-fileLength) +" bytes.");
+ }
+ logAggregationSuccessfullyInThisCyCle = true;
+ } catch (IOException e) {
+ String message = logErrorMessage(logFile, e);
+ if (outputStreamState != null &&
+ outputStreamState.getOutputStream() != null) {
+ outputStreamState.getOutputStream().write(
+ message.getBytes(Charset.forName("UTF-8")));
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, in);
+ }
+
+ IndexedFileLogMeta meta = new IndexedFileLogMeta();
+ meta.setContainerId(containerId.toString());
+ meta.setFileName(logFile.getName());
+ if (outputStreamState != null) {
+ outputStreamState.finish();
+ meta.setFileCompressedSize(outputStreamState.getCompressedSize());
+ meta.setStartIndex(outputStreamState.getStartPos());
+ meta.setFileSize(fileLength);
+ }
+ meta.setLastModificatedTime(logFile.lastModified());
+ metas.add(meta);
+ }
+ logsMetaInThisCycle.addContainerLogMeta(containerId, metas);
+ }
+
+ @Override
+ public void postWrite(LogAggregationFileControllerContext record)
+ throws Exception {
+ // always aggregate the previous logsMeta, and append them together
+ // at the end of the file
+ indexedLogsMeta.addLogMeta(logsMetaInThisCycle);
+ byte[] b = SerializationUtils.serialize(indexedLogsMeta);
+ this.fsDataOStream.write(b);
+ int length = b.length;
+ this.fsDataOStream.writeInt(length);
+ byte[] separator = this.uuid.getBytes(Charset.forName("UTF-8"));
+ this.fsDataOStream.write(separator);
+ if (logAggregationSuccessfullyInThisCyCle) {
+ deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile);
+ }
+ }
+
+ private void deleteFileWithRetries(final FileContext fileContext,
+ final UserGroupInformation userUgi,
+ final Path deletePath) throws Exception {
+ new FSAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ deleteFileWithPrivilege(fileContext, userUgi, deletePath);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private Object deleteFileWithPrivilege(final FileContext fileContext,
+ final UserGroupInformation userUgi, final Path fileToDelete)
+ throws Exception {
+ return userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ if (fileContext.util().exists(fileToDelete)) {
+ fileContext.delete(fileToDelete, false);
+ }
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+ OutputStream os) throws IOException {
+ boolean findLogs = false;
+ boolean createPrintStream = (os == null);
+ ApplicationId appId = logRequest.getAppId();
+ String nodeId = logRequest.getNodeId();
+ String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
+ : LogAggregationUtils.getNodeString(nodeId);
+ List<String> logTypes = new ArrayList<>();
+ if (logRequest.getLogTypes() != null && !logRequest
+ .getLogTypes().isEmpty()) {
+ logTypes.addAll(logRequest.getLogTypes());
+ }
+ String containerIdStr = logRequest.getContainerId();
+ boolean getAllContainers = (containerIdStr == null
+ || containerIdStr.isEmpty());
+ long size = logRequest.getBytes();
+ List<FileStatus> nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
+ this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+ if (nodeFiles.isEmpty()) {
+ throw new IOException("There is no available log fils for "
+ + "application:" + appId);
+ }
+ Map<String, FileStatus> checkSumFiles = filterFiles(
+ nodeFiles, CHECK_SUM_FILE_SUFFIX);
+ List<FileStatus> fileToRead = getNodeLogFileToRead(
+ nodeFiles, nodeIdStr, appId);
+ byte[] buf = new byte[65535];
+ for (FileStatus thisNodeFile : fileToRead) {
+ String nodeName = thisNodeFile.getPath().getName();
+ FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
+ thisNodeFile.getPath().getName());
+ long endIndex = -1;
+ if (checkSum != null) {
+ endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
+ }
+ IndexedLogsMeta indexedLogsMeta = null;
+ try {
+ indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(),
+ endIndex);
+ } catch (Exception ex) {
+ // DO NOTHING
+ LOG.warn("Can not load log meta from the log file:"
+ + thisNodeFile.getPath());
+ continue;
+ }
+ if (indexedLogsMeta == null) {
+ continue;
+ }
+ String compressAlgo = indexedLogsMeta.getCompressName();
+ List<IndexedFileLogMeta> candidates = new ArrayList<>();
+ for (IndexedPerAggregationLogMeta logMeta
+ : indexedLogsMeta.getLogMetas()) {
+ for (Entry<String, List<IndexedFileLogMeta>> meta
+ : logMeta.getLogMetas().entrySet()) {
+ for (IndexedFileLogMeta log : meta.getValue()) {
+ if (!getAllContainers && !log.getContainerId()
+ .equals(containerIdStr)) {
+ continue;
+ }
+ if (logTypes != null && !logTypes.isEmpty() &&
+ !logTypes.contains(log.getFileName())) {
+ continue;
+ }
+ candidates.add(log);
+ }
+ }
+ }
+ if (candidates.isEmpty()) {
+ continue;
+ }
+
+ Algorithm compressName = Compression.getCompressionAlgorithmByName(
+ compressAlgo);
+ Decompressor decompressor = compressName.getDecompressor();
+ FileContext fileContext = FileContext.getFileContext(
+ thisNodeFile.getPath().toUri(), conf);
+ FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
+ String currentContainer = "";
+ for (IndexedFileLogMeta candidate : candidates) {
+ if (!candidate.getContainerId().equals(currentContainer)) {
+ if (createPrintStream) {
+ closePrintStream(os);
+ os = LogToolUtils.createPrintStream(
+ logRequest.getOutputLocalDir(),
+ thisNodeFile.getPath().getName(),
+ candidate.getContainerId());
+ currentContainer = candidate.getContainerId();
+ }
+ }
+ InputStream in = null;
+ try {
+ in = compressName.createDecompressionStream(
+ new BoundedRangeFileInputStream(fsin,
+ candidate.getStartIndex(),
+ candidate.getFileCompressedSize()),
+ decompressor, getFSInputBufferSize(conf));
+ LogToolUtils.outputContainerLog(candidate.getContainerId(),
+ nodeName, candidate.getFileName(), candidate.getFileSize(), size,
+ Times.format(candidate.getLastModificatedTime()),
+ in, os, buf, ContainerLogAggregationType.AGGREGATED);
+ byte[] b = aggregatedLogSuffix(candidate.getFileName())
+ .getBytes(Charset.forName("UTF-8"));
+ os.write(b, 0, b.length);
+ findLogs = true;
+ } catch (IOException e) {
+ System.err.println(e.getMessage());
+ compressName.returnDecompressor(decompressor);
+ continue;
+ } finally {
+ os.flush();
+ IOUtils.cleanupWithLogger(LOG, in);
+ }
+ }
+ }
+ return findLogs;
+ }
+
+ // TODO: fix me if the remote file system does not support append operation.
+ @Override
+ public List<ContainerLogMeta> readAggregatedLogsMeta(
+ ContainerLogsRequest logRequest) throws IOException {
+ List<IndexedLogsMeta> listOfLogsMeta = new ArrayList<>();
+ List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
+ String containerIdStr = logRequest.getContainerId();
+ String nodeId = logRequest.getNodeId();
+ ApplicationId appId = logRequest.getAppId();
+ String appOwner = logRequest.getAppOwner();
+ boolean getAllContainers = (containerIdStr == null ||
+ containerIdStr.isEmpty());
+ String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
+ : LogAggregationUtils.getNodeString(nodeId);
+ List<FileStatus> nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
+ this.remoteRootLogDirSuffix);
+ if (nodeFiles.isEmpty()) {
+ throw new IOException("There is no available log fils for "
+ + "application:" + appId);
+ }
+ Map<String, FileStatus> checkSumFiles = filterFiles(
+ nodeFiles, CHECK_SUM_FILE_SUFFIX);
+ List<FileStatus> fileToRead = getNodeLogFileToRead(
+ nodeFiles, nodeIdStr, appId);
+ for(FileStatus thisNodeFile : fileToRead) {
+ try {
+ FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
+ thisNodeFile.getPath().getName());
+ long endIndex = -1;
+ if (checkSum != null) {
+ endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
+ }
+ IndexedLogsMeta current = loadIndexedLogsMeta(
+ thisNodeFile.getPath(), endIndex);
+ if (current != null) {
+ listOfLogsMeta.add(current);
+ }
+ } catch (IOException ex) {
+ // DO NOTHING
+ LOG.warn("Can not get log meta from the log file:"
+ + thisNodeFile.getPath());
+ }
+ }
+ for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) {
+ String curNodeId = indexedLogMeta.getNodeId();
+ for (IndexedPerAggregationLogMeta logMeta :
+ indexedLogMeta.getLogMetas()) {
+ if (getAllContainers) {
+ for (Entry<String, List<IndexedFileLogMeta>> log : logMeta
+ .getLogMetas().entrySet()) {
+ ContainerLogMeta meta = new ContainerLogMeta(
+ log.getKey().toString(), curNodeId);
+ for (IndexedFileLogMeta aMeta : log.getValue()) {
+ meta.addLogMeta(aMeta.getFileName(), Long.toString(
+ aMeta.getFileSize()),
+ Times.format(aMeta.getLastModificatedTime()));
+ }
+ containersLogMeta.add(meta);
+ }
+ } else if (logMeta.getContainerLogMeta(containerIdStr) != null) {
+ ContainerLogMeta meta = new ContainerLogMeta(containerIdStr,
+ curNodeId);
+ for (IndexedFileLogMeta log :
+ logMeta.getContainerLogMeta(containerIdStr)) {
+ meta.addLogMeta(log.getFileName(), Long.toString(
+ log.getFileSize()),
+ Times.format(log.getLastModificatedTime()));
+ }
+ containersLogMeta.add(meta);
+ }
+ }
+ }
+ Collections.sort(containersLogMeta, new Comparator<ContainerLogMeta>() {
+ @Override
+ public int compare(ContainerLogMeta o1, ContainerLogMeta o2) {
+ return o1.getContainerId().compareTo(o2.getContainerId());
+ }
+ });
+ return containersLogMeta;
+ }
+
+ @Private
+ public Map<String, FileStatus> filterFiles(
+ List<FileStatus> fileList, final String suffix) throws IOException {
+ Map<String, FileStatus> checkSumFiles = new HashMap<>();
+ Set<FileStatus> status = new HashSet<FileStatus>(fileList);
+ Iterable<FileStatus> mask =
+ Iterables.filter(status, new Predicate<FileStatus>() {
+ @Override
+ public boolean apply(FileStatus next) {
+ return next.getPath().getName().endsWith(
+ suffix);
+ }
+ });
+ status = Sets.newHashSet(mask);
+ for (FileStatus file : status) {
+ checkSumFiles.put(file.getPath().getName(), file);
+ }
+ return checkSumFiles;
+ }
+
+ @Private
+ public List<FileStatus> getNodeLogFileToRead(
+ List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
+ throws IOException {
+ List<FileStatus> listOfFiles = new ArrayList<>();
+ List<FileStatus> files = new ArrayList<>(nodeFiles);
+ for (FileStatus file : files) {
+ String nodeName = file.getPath().getName();
+ if ((nodeId == null || nodeId.isEmpty()
+ || nodeName.contains(LogAggregationUtils
+ .getNodeString(nodeId))) && !nodeName.endsWith(
+ LogAggregationUtils.TMP_FILE_SUFFIX) &&
+ !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
+ if (nodeName.equals(appId + ".har")) {
+ Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
+ files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
+ continue;
+ }
+ listOfFiles.add(file);
+ }
+ }
+ return listOfFiles;
+ }
+
+ @Private
+ public FileStatus getAllChecksumFiles(Map<String, FileStatus> fileMap,
+ String fileName) {
+ for (Entry<String, FileStatus> file : fileMap.entrySet()) {
+ if (file.getKey().startsWith(fileName) && file.getKey()
+ .endsWith(CHECK_SUM_FILE_SUFFIX)) {
+ return file.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void renderAggregatedLogsBlock(Block html, ViewContext context) {
+ IndexedFileAggregatedLogsBlock block = new IndexedFileAggregatedLogsBlock(
+ context, this.conf, this);
+ block.render(html);
+ }
+
+ @Override
+ public String getApplicationOwner(Path aggregatedLogPath)
+ throws IOException {
+ if (this.cachedIndexedLogsMeta == null
+ || !this.cachedIndexedLogsMeta.getRemoteLogPath()
+ .equals(aggregatedLogPath)) {
+ this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
+ loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath);
+ }
+ return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser();
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationAcls(
+ Path aggregatedLogPath) throws IOException {
+ if (this.cachedIndexedLogsMeta == null
+ || !this.cachedIndexedLogsMeta.getRemoteLogPath()
+ .equals(aggregatedLogPath)) {
+ this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
+ loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath);
+ }
+ return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls();
+ }
+
+ @Override
+ public Path getRemoteAppLogDir(ApplicationId appId, String user)
+ throws IOException {
+ return LogAggregationUtils.getRemoteAppLogDir(conf, appId, user,
+ this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+ }
+
+ @Private
+ public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end)
+ throws IOException {
+ FileContext fileContext =
+ FileContext.getFileContext(remoteLogPath.toUri(), conf);
+ FSDataInputStream fsDataIStream = null;
+ try {
+ fsDataIStream = fileContext.open(remoteLogPath);
+ if (end == 0) {
+ return null;
+ }
+ long fileLength = end < 0 ? fileContext.getFileStatus(
+ remoteLogPath).getLen() : end;
+ fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
+ int offset = fsDataIStream.readInt();
+ byte[] array = new byte[offset];
+ fsDataIStream.seek(
+ fileLength - offset - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
+ int actual = fsDataIStream.read(array);
+ if (actual != offset) {
+ throw new IOException("Error on loading log meta from "
+ + remoteLogPath);
+ }
+ return (IndexedLogsMeta)SerializationUtils
+ .deserialize(array);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDataIStream);
+ }
+ }
+
+ private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath)
+ throws IOException {
+ return loadIndexedLogsMeta(remoteLogPath, -1);
+ }
+
+ @Private
+ public long loadIndexedLogsCheckSum(Path remoteLogCheckSumPath)
+ throws IOException {
+ FileContext fileContext =
+ FileContext.getFileContext(remoteLogCheckSumPath.toUri(), conf);
+ FSDataInputStream fsDataIStream = null;
+ try {
+ fsDataIStream = fileContext.open(remoteLogCheckSumPath);
+ return fsDataIStream.readLong();
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDataIStream);
+ }
+ }
+
+ /**
+ * This IndexedLogsMeta includes all the meta information
+ * for the aggregated log file.
+ */
+ @Private
+ @VisibleForTesting
+ public static class IndexedLogsMeta implements Serializable {
+
+ private static final long serialVersionUID = 5439875373L;
+ private int version;
+ private String user;
+ private String compressName;
+ private Map<ApplicationAccessType, String> acls;
+ private String nodeId;
+ private List<IndexedPerAggregationLogMeta> logMetas = new ArrayList<>();
+
+ public int getVersion() {
+ return this.version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public String getUser() {
+ return this.user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public Map<ApplicationAccessType, String> getAcls() {
+ return this.acls;
+ }
+
+ public void setAcls(Map<ApplicationAccessType, String> acls) {
+ this.acls = acls;
+ }
+
+ public String getCompressName() {
+ return compressName;
+ }
+
+ public void setCompressName(String compressName) {
+ this.compressName = compressName;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public void addLogMeta(IndexedPerAggregationLogMeta logMeta) {
+ logMetas.add(logMeta);
+ }
+
+ public List<IndexedPerAggregationLogMeta> getLogMetas() {
+ return logMetas;
+ }
+ }
+
+ /**
+ * This IndexedPerAggregationLogMeta includes the meta information
+ * for all files which would be aggregated in one
+ * Log aggregation cycle.
+ */
+ public static class IndexedPerAggregationLogMeta implements Serializable {
+ private static final long serialVersionUID = 3929298383L;
+ private String remoteNodeLogFileName;
+ private Map<String, List<IndexedFileLogMeta>> logMetas = new HashMap<>();
+ private long uploadTimeStamp;
+
+ public String getRemoteNodeFile() {
+ return remoteNodeLogFileName;
+ }
+ public void setRemoteNodeFile(String remoteNodeLogFileName) {
+ this.remoteNodeLogFileName = remoteNodeLogFileName;
+ }
+
+ public void addContainerLogMeta(String containerId,
+ List<IndexedFileLogMeta> logMeta) {
+ logMetas.put(containerId, logMeta);
+ }
+
+ public List<IndexedFileLogMeta> getContainerLogMeta(String containerId) {
+ return logMetas.get(containerId);
+ }
+
+ public Map<String, List<IndexedFileLogMeta>> getLogMetas() {
+ return logMetas;
+ }
+
+ public long getUploadTimeStamp() {
+ return uploadTimeStamp;
+ }
+
+ public void setUploadTimeStamp(long uploadTimeStamp) {
+ this.uploadTimeStamp = uploadTimeStamp;
+ }
+ }
+
+ /**
+ * This IndexedFileLogMeta includes the meta information
+ * for a single file which would be aggregated in one
+ * Log aggregation cycle.
+ *
+ */
+ @Private
+ @VisibleForTesting
+ public static class IndexedFileLogMeta implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private String containerId;
+ private String fileName;
+ private long fileSize;
+ private long fileCompressedSize;
+ private long lastModificatedTime;
+ private long startIndex;
+
+ public String getFileName() {
+ return fileName;
+ }
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+ public void setFileSize(long fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public long getFileCompressedSize() {
+ return fileCompressedSize;
+ }
+ public void setFileCompressedSize(long fileCompressedSize) {
+ this.fileCompressedSize = fileCompressedSize;
+ }
+
+ public long getLastModificatedTime() {
+ return lastModificatedTime;
+ }
+ public void setLastModificatedTime(long lastModificatedTime) {
+ this.lastModificatedTime = lastModificatedTime;
+ }
+
+ public long getStartIndex() {
+ return startIndex;
+ }
+ public void setStartIndex(long startIndex) {
+ this.startIndex = startIndex;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+ public void setContainerId(String containerId) {
+ this.containerId = containerId;
+ }
+ }
+
+ private static String logErrorMessage(File logFile, Exception e) {
+ String message = "Error aggregating log file. Log file : "
+ + logFile.getAbsolutePath() + ". " + e.getMessage();
+ LOG.error(message, e);
+ return message;
+ }
+
+ private static class IndexedFileOutputStreamState {
+ private final Algorithm compressAlgo;
+ private Compressor compressor;
+ private final FSDataOutputStream fsOut;
+ private long posStart;
+ private final SimpleBufferedOutputStream fsBufferedOutput;
+ private OutputStream out;
+ private long offset;
+
+ IndexedFileOutputStreamState(Algorithm compressionName,
+ FSDataOutputStream fsOut, Configuration conf, long offset)
+ throws IOException {
+ this.compressAlgo = compressionName;
+ this.fsOut = fsOut;
+ this.offset = offset;
+ this.posStart = fsOut.getPos();
+
+ BytesWritable fsOutputBuffer = new BytesWritable();
+ fsOutputBuffer.setCapacity(LogAggregationIndexedFileController
+ .getFSOutputBufferSize(conf));
+
+ this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut,
+ fsOutputBuffer.getBytes());
+
+ this.compressor = compressAlgo.getCompressor();
+
+ try {
+ this.out = compressAlgo.createCompressionStream(
+ fsBufferedOutput, compressor, 0);
+ } catch (IOException e) {
+ compressAlgo.returnCompressor(compressor);
+ throw e;
+ }
+ }
+
+ OutputStream getOutputStream() {
+ return out;
+ }
+
+ long getCurrentPos() throws IOException {
+ return fsOut.getPos() + fsBufferedOutput.size();
+ }
+
+ long getStartPos() {
+ return posStart + offset;
+ }
+
+ long getCompressedSize() throws IOException {
+ long ret = getCurrentPos() - posStart;
+ return ret;
+ }
+
+ void finish() throws IOException {
+ try {
+ if (out != null) {
+ out.flush();
+ out = null;
+ }
+ } finally {
+ compressAlgo.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ private static class CachedIndexedLogsMeta {
+ private final Path remoteLogPath;
+ private final IndexedLogsMeta indexedLogsMeta;
+ CachedIndexedLogsMeta(IndexedLogsMeta indexedLogsMeta,
+ Path remoteLogPath) {
+ this.indexedLogsMeta = indexedLogsMeta;
+ this.remoteLogPath = remoteLogPath;
+ }
+
+ public Path getRemoteLogPath() {
+ return this.remoteLogPath;
+ }
+
+ public IndexedLogsMeta getCachedIndexedLogsMeta() {
+ return this.indexedLogsMeta;
+ }
+ }
+
+ @Private
+ public static int getFSOutputBufferSize(Configuration conf) {
+ return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
+ }
+
+ @Private
+ public static int getFSInputBufferSize(Configuration conf) {
+ return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
+ }
+
+ private abstract class FSAction<T> {
+ abstract T run() throws Exception;
+
+ T runWithRetries() throws Exception {
+ int retry = 0;
+ while (true) {
+ try {
+ return run();
+ } catch (IOException e) {
+ LOG.info("Exception while executing an FS operation.", e);
+ if (++retry > fsNumRetries) {
+ LOG.info("Maxed out FS retries. Giving up!");
+ throw e;
+ }
+ LOG.info("Retrying operation on FS. Retry no. " + retry);
+ Thread.sleep(fsRetryInterval);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fddabc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
new file mode 100644
index 0000000..08ddece
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
+import org.apache.hadoop.classification.InterfaceAudience;
+
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org