You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ad...@apache.org on 2020/12/12 08:43:09 UTC
[hadoop] branch trunk updated: YARN-10031. Create a general purpose
log request with additional query parameters. Contributed by Andras Gyori
This is an automated email from the ASF dual-hosted git repository.
adamantal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3234e5e YARN-10031. Create a general purpose log request with additional query parameters. Contributed by Andras Gyori
3234e5e is described below
commit 3234e5eaf36aae1839b2ab5cc43517afe7087a45
Author: adamantal <ad...@banzaicloud.com>
AuthorDate: Sat Dec 12 09:42:22 2020 +0100
YARN-10031. Create a general purpose log request with additional query parameters. Contributed by Andras Gyori
---
.../mapreduce/v2/hs/webapp/HsWebServices.java | 27 ++
.../logaggregation/ExtendedLogMetaRequest.java | 291 +++++++++++++++
.../LogAggregationMetaCollector.java | 143 ++++++++
.../yarn/logaggregation/LogAggregationUtils.java | 107 +++++-
.../LogAggregationFileController.java | 46 +++
.../ifile/LogAggregationIndexedFileController.java | 76 ++++
.../tfile/LogAggregationTFileController.java | 55 +++
.../TestLogAggregationMetaCollector.java | 391 +++++++++++++++++++++
.../FakeLogAggregationFileController.java | 96 +++++
.../TestLogAggregationIndexedFileController.java | 106 ++++++
.../hadoop/yarn/server/webapp/LogServlet.java | 34 +-
.../yarn/server/webapp/YarnWebServiceParams.java | 2 +
12 files changed, 1360 insertions(+), 14 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
index 6e17500..4ee7636 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import java.io.IOException;
+import java.util.Set;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
@@ -69,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.LogServlet;
@@ -442,6 +444,31 @@ public class HsWebServices extends WebServices {
}
@GET
+ @Path("/extended-log-query")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public Response getAggregatedLogsMeta(@Context HttpServletRequest hsr,
+ @QueryParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String fileName,
+ @QueryParam(YarnWebServiceParams.FILESIZE) Set<String> fileSize,
+ @QueryParam(YarnWebServiceParams.MODIFICATION_TIME) Set<String>
+ modificationTime,
+ @QueryParam(YarnWebServiceParams.APP_ID) String appIdStr,
+ @QueryParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
+ @QueryParam(YarnWebServiceParams.NM_ID) String nmId) throws IOException {
+ init();
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder logsRequest =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ logsRequest.setAppId(appIdStr);
+ logsRequest.setFileName(fileName);
+ logsRequest.setContainerId(containerIdStr);
+ logsRequest.setFileSize(fileSize);
+ logsRequest.setModificationTime(modificationTime);
+ logsRequest.setNodeId(nmId);
+ return logServlet.getContainerLogsInfo(hsr, logsRequest);
+ }
+
+ @GET
@Path("/aggregatedlogs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@InterfaceAudience.Public
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ExtendedLogMetaRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ExtendedLogMetaRequest.java
new file mode 100644
index 0000000..0815e03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ExtendedLogMetaRequest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a query of log metadata with extended filtering capabilities.
+ */
+public class ExtendedLogMetaRequest {
+ private final String user;
+ private final String appId;
+ private final String containerId;
+ private final MatchExpression nodeId;
+ private final MatchExpression fileName;
+ private final ComparisonCollection fileSize;
+ private final ComparisonCollection modificationTime;
+
+ public ExtendedLogMetaRequest(
+ String user, String appId, String containerId, MatchExpression nodeId,
+ MatchExpression fileName, ComparisonCollection fileSize,
+ ComparisonCollection modificationTime) {
+ this.user = user;
+ this.appId = appId;
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ this.modificationTime = modificationTime;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public MatchExpression getNodeId() {
+ return nodeId;
+ }
+
+ public MatchExpression getFileName() {
+ return fileName;
+ }
+
+ public ComparisonCollection getFileSize() {
+ return fileSize;
+ }
+
+ public ComparisonCollection getModificationTime() {
+ return modificationTime;
+ }
+
+ public static class ExtendedLogMetaRequestBuilder {
+ private String user;
+ private String appId;
+ private String containerId;
+ private MatchExpression nodeId = new MatchExpression(null);
+ private MatchExpression fileName = new MatchExpression(null);
+ private ComparisonCollection fileSize = new ComparisonCollection(null);
+ private ComparisonCollection modificationTime =
+ new ComparisonCollection(null);
+
+ public ExtendedLogMetaRequestBuilder setUser(String userName) {
+ this.user = userName;
+ return this;
+ }
+
+ public ExtendedLogMetaRequestBuilder setAppId(String applicationId) {
+ this.appId = applicationId;
+ return this;
+ }
+
+ public ExtendedLogMetaRequestBuilder setContainerId(String container) {
+ this.containerId = container;
+ return this;
+ }
+
+ public ExtendedLogMetaRequestBuilder setNodeId(String node) {
+ try {
+ this.nodeId = new MatchExpression(node);
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Node Id expression is invalid", e);
+ }
+ return this;
+ }
+
+ public ExtendedLogMetaRequestBuilder setFileName(String file) {
+ try {
+ this.fileName = new MatchExpression(file);
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Filename expression is invalid", e);
+ }
+ return this;
+ }
+
+ public ExtendedLogMetaRequestBuilder setFileSize(Set<String> fileSizes) {
+ this.fileSize = new ComparisonCollection(fileSizes);
+ return this;
+ }
+
+ public ExtendedLogMetaRequestBuilder setModificationTime(
+ Set<String> modificationTimes) {
+ this.modificationTime = new ComparisonCollection(modificationTimes);
+ return this;
+ }
+
+ public boolean isUserSet() {
+ return user != null;
+ }
+
+ public ExtendedLogMetaRequest build() {
+ return new ExtendedLogMetaRequest(user, appId, containerId, nodeId,
+ fileName, fileSize, modificationTime);
+ }
+ }
+
+ /**
+ * A collection of {@code ComparisonExpression}.
+ */
+ public static class ComparisonCollection {
+ private List<ComparisonExpression> comparisonExpressions;
+
+ public ComparisonCollection(Set<String> expressions) {
+ if (expressions == null) {
+ this.comparisonExpressions = Collections.emptyList();
+ } else {
+ List<String> equalExpressions = expressions.stream().filter(
+ e -> !e.startsWith(ComparisonExpression.GREATER_OPERATOR) &&
+ !e.startsWith(ComparisonExpression.LESSER_OPERATOR))
+ .collect(Collectors.toList());
+ if (equalExpressions.size() > 1) {
+ throw new IllegalArgumentException(
+ "Can not process more, than one exact match. Matches: "
+ + String.join(" ", equalExpressions));
+ }
+
+ this.comparisonExpressions = expressions.stream()
+ .map(ComparisonExpression::new).collect(Collectors.toList());
+
+ }
+
+ }
+
+ public boolean match(Long value) {
+ return match(value, true);
+ }
+
+ public boolean match(String value) {
+ if (value == null) {
+ return true;
+ }
+
+ return match(Long.valueOf(value), true);
+ }
+
+ /**
+ * Checks, if the given value matches all the {@code ComparisonExpression}.
+ * This implies an AND logic between the expressions.
+ * @param value given value to match against
+ * @param defaultValue default value to return when no expression is defined
+ * @return whether all expressions were matched
+ */
+ public boolean match(Long value, boolean defaultValue) {
+ if (comparisonExpressions.isEmpty()) {
+ return defaultValue;
+ }
+
+ return comparisonExpressions.stream()
+ .allMatch(expr -> expr.match(value));
+ }
+
+ }
+
+ /**
+ * Wraps a comparison logic based on a stringified expression.
+ * The format of the expression is:
+ * >value = is greater than value
+ * <value = is lower than value
+ * value = is equal to value
+ */
+ public static class ComparisonExpression {
+ public static final String GREATER_OPERATOR = ">";
+ public static final String LESSER_OPERATOR = "<";
+
+ private String expression;
+ private Predicate<Long> comparisonFn;
+ private Long convertedValue;
+
+ public ComparisonExpression(String expression) {
+ if (expression == null) {
+ return;
+ }
+
+ if (expression.startsWith(GREATER_OPERATOR)) {
+ convertedValue = Long.parseLong(expression.substring(1));
+ comparisonFn = a -> a > convertedValue;
+ } else if (expression.startsWith(LESSER_OPERATOR)) {
+ convertedValue = Long.parseLong(expression.substring(1));
+ comparisonFn = a -> a < convertedValue;
+ } else {
+ convertedValue = Long.parseLong(expression);
+ comparisonFn = a -> a.equals(convertedValue);
+ }
+
+ this.expression = expression;
+ }
+
+ public boolean match(String value) {
+ return match(Long.valueOf(value), true);
+ }
+
+ public boolean match(Long value) {
+ return match(value, true);
+ }
+
+ /**
+ * Test the given value with the defined comparison functions based on
+ * stringified expression.
+ * @param value value to test with
+ * @param defaultValue value to return when no expression was defined
+ * @return comparison test result or the given default value
+ */
+ public boolean match(Long value, boolean defaultValue) {
+ if (expression == null) {
+ return defaultValue;
+ } else {
+ return comparisonFn.test(value);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return convertedValue != null ? String.valueOf(convertedValue) : "";
+ }
+ }
+
+ /**
+ * Wraps a regex matcher.
+ */
+ public static class MatchExpression {
+ private Pattern expression;
+
+ public MatchExpression(String expression) {
+ this.expression = expression != null ? Pattern.compile(expression) : null;
+ }
+
+ /**
+ * Matches the value on the expression.
+ * @param value value to be matched against
+ * @return result of the match or true, if no expression was defined
+ */
+ public boolean match(String value) {
+ return expression == null || expression.matcher(value).matches();
+ }
+
+ @Override
+ public String toString() {
+ return expression != null ? expression.pattern() : "";
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationMetaCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationMetaCollector.java
new file mode 100644
index 0000000..9c6e5b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationMetaCollector.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Facilitates an extended query of aggregated log file metadata with
+ * the help of file controllers.
+ */
+public class LogAggregationMetaCollector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ LogAggregationMetaCollector.class);
+
+ private final ExtendedLogMetaRequest logsRequest;
+ private final Configuration conf;
+
+ public LogAggregationMetaCollector(
+ ExtendedLogMetaRequest logsRequest, Configuration conf) {
+ this.logsRequest = logsRequest;
+ this.conf = conf;
+ }
+
+ /**
+ * Collects all log file metadata based on the complex query defined in
+ * {@code UserLogsRequest}.
+ * @param fileController log aggregation file format controller
+ * @return collection of log file metadata grouped by containers
+ * @throws IOException if node file is not reachable
+ */
+ public List<ContainerLogMeta> collect(
+ LogAggregationFileController fileController) throws IOException {
+ List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
+ RemoteIterator<FileStatus> appDirs = fileController.
+ getApplicationDirectoriesOfUser(logsRequest.getUser());
+
+ while (appDirs.hasNext()) {
+ FileStatus currentAppDir = appDirs.next();
+ if (logsRequest.getAppId() == null ||
+ logsRequest.getAppId().equals(currentAppDir.getPath().getName())) {
+ ApplicationId appId = ApplicationId.fromString(
+ currentAppDir.getPath().getName());
+ RemoteIterator<FileStatus> nodeFiles = fileController
+ .getNodeFilesOfApplicationDirectory(currentAppDir);
+
+ while (nodeFiles.hasNext()) {
+ FileStatus currentNodeFile = nodeFiles.next();
+ if (!logsRequest.getNodeId().match(currentNodeFile.getPath()
+ .getName())) {
+ continue;
+ }
+
+ if (currentNodeFile.getPath().getName().equals(
+ logsRequest.getAppId() + ".har")) {
+ Path p = new Path("har:///"
+ + currentNodeFile.getPath().toUri().getRawPath());
+ nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+ continue;
+ }
+
+ try {
+ Map<String, List<ContainerLogFileInfo>> metaFiles = fileController
+ .getLogMetaFilesOfNode(logsRequest, currentNodeFile, appId);
+ if (metaFiles == null) {
+ continue;
+ }
+
+ metaFiles.entrySet().removeIf(entry ->
+ !(logsRequest.getContainerId() == null ||
+ logsRequest.getContainerId().equals(entry.getKey())));
+
+ containersLogMeta.addAll(createContainerLogMetas(
+ currentNodeFile.getPath().getName(), metaFiles));
+ } catch (IOException ioe) {
+ LOG.warn("Can not get log meta from the log file:"
+ + currentNodeFile.getPath() + "\n" + ioe.getMessage());
+ }
+
+ }
+ }
+
+ }
+ return containersLogMeta;
+ }
+
+ private List<ContainerLogMeta> createContainerLogMetas(
+ String nodeId, Map<String, List<ContainerLogFileInfo>> metaFiles) {
+ List<ContainerLogMeta> containerLogMetas = new ArrayList<>();
+ for (Map.Entry<String, List<ContainerLogFileInfo>> containerLogs
+ : metaFiles.entrySet()) {
+ ContainerLogMeta containerLogMeta = new ContainerLogMeta(
+ containerLogs.getKey(), nodeId);
+ for (ContainerLogFileInfo file : containerLogs.getValue()) {
+ boolean isFileNameMatches = logsRequest.getFileName()
+ .match(file.getFileName());
+ boolean fileSizeComparison = logsRequest.getFileSize()
+ .match(file.getFileSize());
+ boolean modificationTimeComparison = logsRequest.getModificationTime()
+ .match(file.getLastModifiedTime());
+
+ if (!isFileNameMatches || !fileSizeComparison ||
+ !modificationTimeComparison) {
+ continue;
+ }
+ containerLogMeta.getContainerLogMeta().add(file);
+ }
+ if (!containerLogMeta.getContainerLogMeta().isEmpty()) {
+ containerLogMetas.add(containerLogMeta);
+ }
+ }
+ return containerLogMetas;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 5f9466f..ec3d3f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -29,10 +29,13 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.NoSuchElementException;
@Private
public class LogAggregationUtils {
@@ -295,19 +298,8 @@ public class LogAggregationUtils {
// Return both new and old node files combined
RemoteIterator<FileStatus> curDir = nodeFilesCur;
RemoteIterator<FileStatus> prevDir = nodeFilesPrev;
- RemoteIterator<FileStatus> nodeFilesCombined = new
- RemoteIterator<FileStatus>() {
- @Override
- public boolean hasNext() throws IOException {
- return prevDir.hasNext() || curDir.hasNext();
- }
-
- @Override
- public FileStatus next() throws IOException {
- return prevDir.hasNext() ? prevDir.next() : curDir.next();
- }
- };
- return nodeFilesCombined;
+
+ return combineIterators(prevDir, curDir);
}
}
@@ -368,4 +360,93 @@ public class LogAggregationUtils {
return nodeFiles;
}
+ public static RemoteIterator<FileStatus> getRemoteFiles(
+ Configuration conf, Path appPath) throws IOException {
+
+ Path qualifiedLogDir =
+ FileContext.getFileContext(conf).makeQualified(appPath);
+ return FileContext.getFileContext(
+ qualifiedLogDir.toUri(), conf).listStatus(appPath);
+ }
+
+ public static RemoteIterator<FileStatus> getUserRemoteLogDir(
+ Configuration conf, String user, Path remoteRootLogDir,
+ String remoteRootLogDirSuffix) throws IOException {
+ Path userPath = LogAggregationUtils.getRemoteLogSuffixedDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix);
+ final RemoteIterator<FileStatus> userRootDirFiles =
+ getRemoteFiles(conf, userPath);
+
+ RemoteIterator<FileStatus> newDirs = new RemoteIterator<FileStatus>() {
+ private RemoteIterator<FileStatus> currentBucketDir =
+ LogAggregationUtils.getSubDir(conf, userRootDirFiles);
+ @Override
+ public boolean hasNext() throws IOException {
+ return currentBucketDir != null && currentBucketDir.hasNext() ||
+ userRootDirFiles.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ FileStatus next = null;
+ while (next == null) {
+ if (currentBucketDir != null && currentBucketDir.hasNext()) {
+ next = currentBucketDir.next();
+ } else if (userRootDirFiles.hasNext()) {
+ currentBucketDir = LogAggregationUtils.getSubDir(
+ conf, userRootDirFiles);
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+ return next;
+ }
+ };
+
+ RemoteIterator<FileStatus> allDir = newDirs;
+ if (LogAggregationUtils.isOlderPathEnabled(conf)) {
+ try {
+ Path oldPath = LogAggregationUtils.getOlderRemoteLogSuffixedDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix);
+ final RemoteIterator<FileStatus> oldUserRootDirFiles =
+ getRemoteFiles(conf, oldPath);
+ allDir = combineIterators(oldUserRootDirFiles, newDirs);
+ } catch (FileNotFoundException e) {
+ return newDirs;
+ }
+ }
+
+ return allDir;
+ }
+
+ private static RemoteIterator<FileStatus> getSubDir(
+ Configuration conf, RemoteIterator<FileStatus> rootDir)
+ throws IOException {
+ if (rootDir.hasNext()) {
+ Path userPath = rootDir.next().getPath();
+ Path qualifiedLogDir =
+ FileContext.getFileContext(conf).makeQualified(userPath);
+ return FileContext.getFileContext(
+ qualifiedLogDir.toUri(), conf).listStatus(userPath);
+ } else {
+ return null;
+ }
+ }
+
+ private static RemoteIterator<FileStatus> combineIterators(
+ RemoteIterator<FileStatus> first, RemoteIterator<FileStatus> second) {
+ return new RemoteIterator<FileStatus>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return first.hasNext() || second.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ return first.hasNext() ? first.next() : second.next();
+ }
+ };
+
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 40ba555..c6e34ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
@@ -53,7 +54,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.slf4j.Logger;
@@ -225,6 +228,49 @@ public abstract class LogAggregationFileController {
ContainerLogsRequest logRequest) throws IOException;
/**
+ * Returns log file metadata for a node grouped by containers.
+ *
+ * @param logRequest extended query information holder
+ * @param currentNodeFile file status of a node in an application directory
+ * @param appId id of the application, which is the same as in node path
+ * @return log file metadata
+ * @throws IOException if there is no node file
+ */
+ public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
+ ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
+ ApplicationId appId) throws IOException {
+ LOG.info("User aggregated complex log queries " +
+ "are not implemented for this file controller");
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Gets all application directories of a user.
+ *
+ * @param user name of the user
+ * @return a lazy iterator of directories
+ * @throws IOException if user directory does not exist
+ */
+ public RemoteIterator<FileStatus> getApplicationDirectoriesOfUser(
+ String user) throws IOException {
+ return LogAggregationUtils.getUserRemoteLogDir(
+ conf, user, getRemoteRootLogDir(), getRemoteRootLogDirSuffix());
+ }
+
+ /**
+ * Gets all node files in an application directory.
+ *
+ * @param appDir application directory
+ * @return a lazy iterator of files
+ * @throws IOException if file context is not reachable
+ */
+ public RemoteIterator<FileStatus> getNodeFilesOfApplicationDirectory(
+ FileStatus appDir) throws IOException {
+ return LogAggregationUtils
+ .getRemoteFiles(conf, appDir.getPath());
+ }
+
+ /**
* Render Aggregated Logs block.
* @param html the html
* @param context the ViewContext
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 8047f4a..b02466b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
@@ -74,12 +75,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.util.Clock;
@@ -611,6 +614,45 @@ public class LogAggregationIndexedFileController
}
@Override
+ public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
+ ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
+ ApplicationId appId) throws IOException {
+ Map<String, List<ContainerLogFileInfo>> logMetaFiles = new HashMap<>();
+
+ Long checkSumIndex = parseChecksum(currentNodeFile);
+ long endIndex = -1;
+ if (checkSumIndex != null) {
+ endIndex = checkSumIndex;
+ }
+ IndexedLogsMeta current = loadIndexedLogsMeta(
+ currentNodeFile.getPath(), endIndex, appId);
+ if (current != null) {
+ for (IndexedPerAggregationLogMeta logMeta :
+ current.getLogMetas()) {
+ for (Entry<String, List<IndexedFileLogMeta>> log : logMeta
+ .getLogMetas().entrySet()) {
+ String currentContainerId = log.getKey();
+ if (!(logRequest.getContainerId() == null ||
+ logRequest.getContainerId().equals(currentContainerId))) {
+ continue;
+ }
+ logMetaFiles.put(currentContainerId, new ArrayList<>());
+ for (IndexedFileLogMeta aMeta : log.getValue()) {
+ ContainerLogFileInfo file = new ContainerLogFileInfo();
+ file.setFileName(aMeta.getFileName());
+ file.setFileSize(Long.toString(aMeta.getFileSize()));
+ file.setLastModifiedTime(
+ Long.toString(aMeta.getLastModifiedTime()));
+ logMetaFiles.get(currentContainerId).add(file);
+ }
+ }
+ }
+ }
+
+ return logMetaFiles;
+ }
+
+ @Override
public List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException {
List<IndexedLogsMeta> listOfLogsMeta = new ArrayList<>();
@@ -743,6 +785,40 @@ public class LogAggregationIndexedFileController
return checkSumFiles;
}
+ private Long parseChecksum(FileStatus file) {
+ if (!file.getPath().getName().endsWith(CHECK_SUM_FILE_SUFFIX)) {
+ return null;
+ }
+
+ FSDataInputStream checksumFileInputStream = null;
+ try {
+ FileContext fileContext = FileContext
+ .getFileContext(file.getPath().toUri(), conf);
+ String nodeName = null;
+ long index = 0L;
+ checksumFileInputStream = fileContext.open(file.getPath());
+ int nameLength = checksumFileInputStream.readInt();
+ byte[] b = new byte[nameLength];
+ int actualLength = checksumFileInputStream.read(b);
+ if (actualLength == nameLength) {
+ nodeName = new String(b, StandardCharsets.UTF_8);
+ index = checksumFileInputStream.readLong();
+ } else {
+ return null;
+ }
+ if (!nodeName.isEmpty()) {
+ return index;
+ }
+ } catch (IOException ex) {
+ LOG.warn(ex.getMessage());
+ return null;
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
+ }
+
+ return null;
+ }
+
@Private
public List<FileStatus> getNodeLogFileToRead(
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
index 2355d30..b365424 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -25,10 +25,13 @@ import java.io.OutputStream;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
+import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.math3.util.Pair;
@@ -259,6 +262,58 @@ public class LogAggregationTFileController
}
@Override
+ public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
+ ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
+ ApplicationId appId) throws IOException {
+ Map<String, List<ContainerLogFileInfo>> logMetaFiles = new HashMap<>();
+ Path nodePath = currentNodeFile.getPath();
+
+ LogReader reader =
+ new LogReader(conf,
+ nodePath);
+ try {
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+ while (valueStream != null) {
+ if (logRequest.getContainerId() == null ||
+ logRequest.getContainerId().equals(key.toString())) {
+ logMetaFiles.put(key.toString(), new ArrayList<>());
+ fillMetaFiles(currentNodeFile, valueStream,
+ logMetaFiles.get(key.toString()));
+ }
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ } finally {
+ reader.close();
+ }
+ return logMetaFiles;
+ }
+
+ private void fillMetaFiles(
+ FileStatus currentNodeFile, DataInputStream valueStream,
+ List<ContainerLogFileInfo> logMetaFiles)
+ throws IOException {
+ while (true) {
+ try {
+ Pair<String, String> logMeta =
+ LogReader.readContainerMetaDataAndSkipData(
+ valueStream);
+ ContainerLogFileInfo logMetaFile = new ContainerLogFileInfo();
+ logMetaFile.setLastModifiedTime(
+ Long.toString(currentNodeFile.getModificationTime()));
+ logMetaFile.setFileName(logMeta.getFirst());
+ logMetaFile.setFileSize(logMeta.getSecond());
+ logMetaFiles.add(logMetaFile);
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+ }
+
+ @Override
public List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException {
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogAggregationMetaCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogAggregationMetaCollector.java
new file mode 100644
index 0000000..c60635b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogAggregationMetaCollector.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.FakeLogAggregationFileController;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.*;
+
+public class TestLogAggregationMetaCollector {
+ private static final String TEST_NODE = "TEST_NODE_1";
+ private static final String TEST_NODE_2 = "TEST_NODE_2";
+ private static final String BIG_FILE_NAME = "TEST_BIG";
+ private static final String SMALL_FILE_NAME = "TEST_SMALL";
+
+ private static ApplicationId app = ApplicationId.newInstance(
+ Clock.systemDefaultZone().millis(), 1);
+ private static ApplicationId app2 = ApplicationId.newInstance(
+ Clock.systemDefaultZone().millis(), 2);
+
+ private static ApplicationAttemptId appAttempt =
+ ApplicationAttemptId.newInstance(app, 1);
+ private static ApplicationAttemptId app2Attempt =
+ ApplicationAttemptId.newInstance(app2, 1);
+
+ private static ContainerId attemptContainer =
+ ContainerId.newContainerId(appAttempt, 1);
+ private static ContainerId attemptContainer2 =
+ ContainerId.newContainerId(appAttempt, 2);
+
+ private static ContainerId attempt2Container =
+ ContainerId.newContainerId(app2Attempt, 1);
+ private static ContainerId attempt2Container2 =
+ ContainerId.newContainerId(app2Attempt, 2);
+
+ private FakeNodeFileController fileController;
+
+ private static class FakeNodeFileController
+ extends FakeLogAggregationFileController {
+ private Map<ImmutablePair<String, String>,
+ Map<String, List<ContainerLogFileInfo>>> logFiles;
+ private List<FileStatus> appDirs;
+ private List<FileStatus> nodeFiles;
+
+ FakeNodeFileController(
+ Map<ImmutablePair<String, String>, Map<String,
+ List<ContainerLogFileInfo>>> logFiles, List<FileStatus> appDirs,
+ List<FileStatus> nodeFiles) {
+ this.logFiles = logFiles;
+ this.appDirs = appDirs;
+ this.nodeFiles = nodeFiles;
+ }
+
+ @Override
+ public RemoteIterator<FileStatus> getApplicationDirectoriesOfUser(
+ String user) throws IOException {
+ return new RemoteIterator<FileStatus>() {
+ private Iterator<FileStatus> iter = appDirs.iterator();
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return iter.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ return iter.next();
+ }
+ };
+ }
+
+ @Override
+ public RemoteIterator<FileStatus> getNodeFilesOfApplicationDirectory(
+ FileStatus appDir) throws IOException {
+ return new RemoteIterator<FileStatus>() {
+ private Iterator<FileStatus> iter = nodeFiles.iterator();
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return iter.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ return iter.next();
+ }
+ };
+ }
+
+ @Override
+ public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
+ ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
+ ApplicationId appId) throws IOException {
+ return logFiles.get(new ImmutablePair<>(appId.toString(),
+ currentNodeFile.getPath().getName()));
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ fileController = createFileController();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testAllNull() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ request.setAppId(null);
+ request.setContainerId(null);
+ request.setFileName(null);
+ request.setFileSize(null);
+ request.setModificationTime(null);
+ request.setNodeId(null);
+ request.setUser(null);
+
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ request.build(), new YarnConfiguration());
+ List<ContainerLogMeta> res = collector.collect(fileController);
+
+ List<ContainerLogFileInfo> allFile = res.stream()
+ .flatMap(m -> m.getContainerLogMeta().stream())
+ .collect(Collectors.toList());
+ assertEquals(8, allFile.size());
+ }
+
+ @Test
+ public void testAllSet() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ Set<String> fileSizeExpressions = new HashSet<>();
+ fileSizeExpressions.add("<51");
+ Set<String> modificationTimeExpressions = new HashSet<>();
+ modificationTimeExpressions.add("<1000");
+ request.setAppId(app.toString());
+ request.setContainerId(attemptContainer.toString());
+ request.setFileName(String.format("%s.*", SMALL_FILE_NAME));
+ request.setFileSize(fileSizeExpressions);
+ request.setModificationTime(modificationTimeExpressions);
+ request.setNodeId(TEST_NODE);
+ request.setUser("TEST");
+
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ request.build(), new YarnConfiguration());
+ List<ContainerLogMeta> res = collector.collect(fileController);
+
+ List<ContainerLogFileInfo> allFile = res.stream()
+ .flatMap(m -> m.getContainerLogMeta().stream())
+ .collect(Collectors.toList());
+ assertEquals(1, allFile.size());
+ }
+
+ @Test
+ public void testSingleNodeRequest() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ request.setAppId(null);
+ request.setContainerId(null);
+ request.setFileName(null);
+ request.setFileSize(null);
+ request.setModificationTime(null);
+ request.setNodeId(TEST_NODE);
+ request.setUser(null);
+
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ request.build(), new YarnConfiguration());
+ List<ContainerLogMeta> res = collector.collect(fileController);
+
+ List<ContainerLogFileInfo> allFile = res.stream()
+ .flatMap(m -> m.getContainerLogMeta().stream())
+ .collect(Collectors.toList());
+ assertEquals(4, allFile.stream().
+ filter(f -> f.getFileName().contains(TEST_NODE)).count());
+ }
+
+ @Test
+ public void testMultipleNodeRegexRequest() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ request.setAppId(null);
+ request.setContainerId(null);
+ request.setFileName(null);
+ request.setFileSize(null);
+ request.setModificationTime(null);
+ request.setNodeId("TEST_NODE_.*");
+ request.setUser(null);
+
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ request.build(), new YarnConfiguration());
+ List<ContainerLogMeta> res = collector.collect(fileController);
+
+ List<ContainerLogFileInfo> allFile = res.stream()
+ .flatMap(m -> m.getContainerLogMeta().stream())
+ .collect(Collectors.toList());
+ assertEquals(8, allFile.size());
+ }
+
+ @Test
+ public void testMultipleFileRegex() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ request.setAppId(null);
+ request.setContainerId(null);
+ request.setFileName(String.format("%s.*", BIG_FILE_NAME));
+ request.setFileSize(null);
+ request.setModificationTime(null);
+ request.setNodeId(null);
+ request.setUser(null);
+
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ request.build(), new YarnConfiguration());
+ List<ContainerLogMeta> res = collector.collect(fileController);
+
+ List<ContainerLogFileInfo> allFile = res.stream()
+ .flatMap(m -> m.getContainerLogMeta().stream())
+ .collect(Collectors.toList());
+ assertEquals(4, allFile.size());
+ assertTrue(allFile.stream().allMatch(
+ f -> f.getFileName().contains(BIG_FILE_NAME)));
+ }
+
+ @Test
+ public void testContainerIdExactMatch() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ request.setAppId(null);
+ request.setContainerId(attemptContainer.toString());
+ request.setFileName(null);
+ request.setFileSize(null);
+ request.setModificationTime(null);
+ request.setNodeId(null);
+ request.setUser(null);
+
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ request.build(), new YarnConfiguration());
+ List<ContainerLogMeta> res = collector.collect(fileController);
+
+ List<ContainerLogFileInfo> allFile = res.stream()
+ .flatMap(m -> m.getContainerLogMeta().stream())
+ .collect(Collectors.toList());
+ assertEquals(2, allFile.size());
+ assertTrue(allFile.stream().allMatch(
+ f -> f.getFileName().contains(attemptContainer.toString())));
+ }
+
+ @Test
+ public void testMultipleFileBetweenSize() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ Set<String> fileSizeExpressions = new HashSet<>();
+ fileSizeExpressions.add(">50");
+ fileSizeExpressions.add("<101");
+ request.setAppId(null);
+ request.setContainerId(null);
+ request.setFileName(null);
+ request.setFileSize(fileSizeExpressions);
+ request.setModificationTime(null);
+ request.setNodeId(null);
+ request.setUser(null);
+
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ request.build(), new YarnConfiguration());
+ List<ContainerLogMeta> res = collector.collect(fileController);
+
+ List<ContainerLogFileInfo> allFile = res.stream()
+ .flatMap(m -> m.getContainerLogMeta().stream())
+ .collect(Collectors.toList());
+ assertEquals(4, allFile.size());
+ assertTrue(allFile.stream().allMatch(
+ f -> f.getFileSize().equals("100")));
+ }
+
+ @Test
+ public void testInvalidQueryStrings() throws IOException {
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
+ Set<String> fileSizeExpressions = new HashSet<>();
+ fileSizeExpressions.add("50");
+ fileSizeExpressions.add("101");
+ try {
+ request.setFileName("*");
+ fail("An error should be thrown due to an invalid regex");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ request.setFileSize(fileSizeExpressions);
+ fail("An error should be thrown due to multiple exact match expression");
+ } catch (IllegalArgumentException ignored) {
+ }
+ }
+
+ private FakeNodeFileController createFileController() {
+ FileStatus appDir = new FileStatus();
+ appDir.setPath(new Path(String.format("test/%s", app.toString())));
+ FileStatus appDir2 = new FileStatus();
+ appDir2.setPath(new Path(String.format("test/%s", app2.toString())));
+ List<FileStatus> appDirs = new ArrayList<>();
+ appDirs.add(appDir);
+ appDirs.add(appDir2);
+
+ FileStatus nodeFile = new FileStatus();
+ nodeFile.setPath(new Path(String.format("test/%s", TEST_NODE)));
+ FileStatus nodeFile2 = new FileStatus();
+ nodeFile2.setPath(new Path(String.format("test/%s", TEST_NODE_2)));
+ List<FileStatus> nodeFiles = new ArrayList<>();
+ nodeFiles.add(nodeFile);
+ nodeFiles.add(nodeFile2);
+
+ Map<ImmutablePair<String, String>, Map<String,
+ List<ContainerLogFileInfo>>> internal = new HashMap<>();
+ internal.put(new ImmutablePair<>(app.toString(), TEST_NODE),
+ createLogFiles(TEST_NODE, attemptContainer));
+ internal.put(new ImmutablePair<>(app.toString(), TEST_NODE_2),
+ createLogFiles(TEST_NODE_2, attemptContainer2));
+ internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE),
+ createLogFiles(TEST_NODE, attempt2Container));
+ internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE_2),
+ createLogFiles(TEST_NODE_2, attempt2Container2));
+ return new FakeNodeFileController(internal, appDirs, nodeFiles);
+ }
+
+ private Map<String, List<ContainerLogFileInfo>> createLogFiles(
+ String nodeId, ContainerId... containerId) {
+ Map<String, List<ContainerLogFileInfo>> logFiles = new HashMap<>();
+ for (ContainerId c : containerId) {
+
+ List<ContainerLogFileInfo> files = new ArrayList<>();
+ ContainerLogFileInfo bigFile = new ContainerLogFileInfo();
+ bigFile.setFileName(generateFileName(
+ BIG_FILE_NAME, nodeId, c.toString()));
+ bigFile.setFileSize("100");
+ bigFile.setLastModifiedTime("1000");
+ ContainerLogFileInfo smallFile = new ContainerLogFileInfo();
+ smallFile.setFileName(generateFileName(
+ SMALL_FILE_NAME, nodeId, c.toString()));
+ smallFile.setFileSize("50");
+ smallFile.setLastModifiedTime("100");
+ files.add(bigFile);
+ files.add(smallFile);
+
+ logFiles.put(c.toString(), files);
+ }
+ return logFiles;
+ }
+
+ private String generateFileName(
+ String name, String nodeId, String containerId) {
+ return String.format("%s_%s_%s", name, nodeId, containerId);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/FakeLogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/FakeLogAggregationFileController.java
new file mode 100644
index 0000000..c667d3b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/FakeLogAggregationFileController.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.webapp.View;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map;
+
+public class FakeLogAggregationFileController
+ extends LogAggregationFileController {
+
+ @Override
+ protected void initInternal(Configuration conf) {
+
+ }
+
+ @Override
+ public void initializeWriter(LogAggregationFileControllerContext context)
+ throws IOException {
+
+ }
+
+ @Override
+ public void closeWriter() throws LogAggregationDFSException {
+
+ }
+
+ @Override
+ public void write(AggregatedLogFormat.LogKey logKey,
+ AggregatedLogFormat.LogValue logValue) throws IOException {
+
+ }
+
+ @Override
+ public void postWrite(LogAggregationFileControllerContext record)
+ throws Exception {
+
+ }
+
+ @Override
+ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+ OutputStream os) throws IOException {
+ return false;
+ }
+
+ @Override
+ public List<ContainerLogMeta> readAggregatedLogsMeta(
+ ContainerLogsRequest logRequest) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void renderAggregatedLogsBlock(HtmlBlock.Block html,
+ View.ViewContext context) {
+
+ }
+
+ @Override
+ public String getApplicationOwner(Path aggregatedLogPath,
+ ApplicationId appId) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationAcls(
+ Path aggregatedLogPath, ApplicationId appId) throws IOException {
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java
index 7335181..2da413d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
@@ -497,4 +498,109 @@ public class TestLogAggregationIndexedFileController
fileFormat.initialize(conf, fileControllerName);
assertThat(fileFormat.getRollOverLogMaxSize(conf)).isZero();
}
+
+ @Test
+ public void testGetLogMetaFilesOfNode() throws Exception {
+ if (fs.exists(rootLocalLogDirPath)) {
+ fs.delete(rootLocalLogDirPath, true);
+ }
+ assertTrue(fs.mkdirs(rootLocalLogDirPath));
+
+ Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString());
+ if (fs.exists(appLogsDir)) {
+ fs.delete(appLogsDir, true);
+ }
+ assertTrue(fs.mkdirs(appLogsDir));
+
+ List<String> logTypes = new ArrayList<String>();
+ logTypes.add("syslog");
+ logTypes.add("stdout");
+ logTypes.add("stderr");
+
+ Set<File> files = new HashSet<>();
+
+ LogKey key1 = new LogKey(containerId.toString());
+
+ for(String logType : logTypes) {
+ File file = createAndWriteLocalLogFile(containerId, appLogsDir,
+ logType);
+ files.add(file);
+ }
+ files.add(createZeroLocalLogFile(appLogsDir));
+
+ LogValue value = mock(LogValue.class);
+ when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
+
+ LogAggregationIndexedFileController fileFormat =
+ new LogAggregationIndexedFileController();
+
+ fileFormat.initialize(getConf(), "Indexed");
+
+ Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+ Path appDir = fileFormat.getRemoteAppLogDir(appId,
+ USER_UGI.getShortUserName());
+ if (fs.exists(appDir)) {
+ fs.delete(appDir, true);
+ }
+ assertTrue(fs.mkdirs(appDir));
+
+ Path logPath = fileFormat.getRemoteNodeLogFileForApp(
+ appId, USER_UGI.getShortUserName(), nodeId);
+ LogAggregationFileControllerContext context =
+ new LogAggregationFileControllerContext(
+ logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI);
+ // initialize the writer
+ fileFormat.initializeWriter(context);
+
+ fileFormat.write(key1, value);
+ fileFormat.postWrite(context);
+ fileFormat.closeWriter();
+
+ ContainerLogsRequest logRequest = new ContainerLogsRequest();
+ logRequest.setAppId(appId);
+ logRequest.setNodeId(nodeId.toString());
+ logRequest.setAppOwner(USER_UGI.getShortUserName());
+ logRequest.setContainerId(containerId.toString());
+ logRequest.setBytes(Long.MAX_VALUE);
+ // create a checksum file
+ final ControlledClock clock = new ControlledClock();
+ clock.setTime(System.currentTimeMillis());
+ Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
+ appId, USER_UGI.getShortUserName()),
+ LogAggregationUtils.getNodeString(nodeId)
+ + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
+ FSDataOutputStream fInput = null;
+ try {
+ String nodeName = logPath.getName() + "_" + clock.getTime();
+ fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
+ fInput.writeInt(nodeName.length());
+ fInput.write(nodeName.getBytes(
+ Charset.forName("UTF-8")));
+ fInput.writeLong(0);
+ } finally {
+ IOUtils.closeQuietly(fInput);
+ }
+
+ Path nodePath = LogAggregationUtils.getRemoteAppLogDir(
+ fileFormat.getRemoteRootLogDir(), appId, USER_UGI.getShortUserName(),
+ fileFormat.getRemoteRootLogDirSuffix());
+ FileStatus[] nodes = fs.listStatus(nodePath);
+ ExtendedLogMetaRequest req =
+ new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder().build();
+ for (FileStatus node : nodes) {
+ Map<String, List<ContainerLogFileInfo>> metas =
+ fileFormat.getLogMetaFilesOfNode(req, node, appId);
+
+ if (node.getPath().getName().contains(
+ LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX)) {
+ assertTrue("Checksum node files should not contain any logs",
+ metas.isEmpty());
+ } else {
+ assertFalse("Non-checksum node files should contain log files",
+ metas.isEmpty());
+ assertEquals(4, metas.values().stream().findFirst().get().size());
+ }
+ }
+
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java
index fb8ad60..c61391b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java
@@ -31,8 +31,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationMetaCollector;
+import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.RemoteLogPathEntry;
@@ -264,6 +266,36 @@ public class LogServlet extends Configured {
redirectedFromNode, null, manualRedirection);
}
+ public Response getContainerLogsInfo(
+ HttpServletRequest req,
+ ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder logsRequest)
+ throws IOException {
+ List<ContainerLogMeta> logs = new ArrayList<>();
+
+ if (!logsRequest.isUserSet()) {
+ logsRequest.setUser(UserGroupInformation.getCurrentUser().getUserName());
+ }
+ LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
+ logsRequest.build(), getConf());
+
+ for (LogAggregationFileController fc : getOrCreateFactory()
+ .getConfiguredLogAggregationFileControllerList()) {
+ logs.addAll(collector.collect(fc));
+ }
+
+ List<ContainerLogsInfo> containersLogsInfo = convertToContainerLogsInfo(
+ logs, false);
+ GenericEntity<List<ContainerLogsInfo>> meta =
+ new GenericEntity<List<ContainerLogsInfo>>(containersLogsInfo) {
+ };
+ Response.ResponseBuilder response = Response.ok(meta);
+ // Sending the X-Content-Type-Options response header with the value
+ // nosniff will prevent Internet Explorer from MIME-sniffing a response
+ // away from the declared content-type.
+ response.header("X-Content-Type-Options", "nosniff");
+ return response.build();
+ }
+
/**
* Returns information about the logs for a specific container.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
index 3aade3f..84697a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
@@ -40,4 +40,6 @@ public interface YarnWebServiceParams {
String CLUSTER_ID = "clusterid";
String MANUAL_REDIRECTION = "manual_redirection";
String REMOTE_USER = "user";
+ String FILESIZE = "file_size";
+ String MODIFICATION_TIME = "modification_time";
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org