You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/01/20 11:37:17 UTC
[hadoop] branch trunk updated: YARN-9525. IFile format is not
working against s3a remote folder. Contributed by Adam Antal
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d52bbb YARN-9525. IFile format is not working against s3a remote folder. Contributed by Adam Antal
6d52bbb is described below
commit 6d52bbbfcfd7750b7e547abdcd0d14632d6ed9b6
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Mon Jan 20 12:36:55 2020 +0100
YARN-9525. IFile format is not working against s3a remote folder. Contributed by Adam Antal
---
.../ifile/LogAggregationIndexedFileController.java | 78 ++++++++++++++++------
1 file changed, 58 insertions(+), 20 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 7af58d7..605997f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -44,6 +44,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -110,6 +111,7 @@ public class LogAggregationIndexedFileController
"indexedFile.fs.retry-interval-ms";
private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB =
"indexedFile.log.roll-over.max-file-size-gb";
+ private static final int LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT = 10;
@VisibleForTesting
public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
@@ -182,9 +184,16 @@ public class LogAggregationIndexedFileController
indexedLogsMeta.setCompressName(compressName);
}
Path aggregatedLogFile = null;
+ Pair<Path, Boolean> initializationResult = null;
+ boolean createdNew;
+
if (context.isLogAggregationInRolling()) {
- aggregatedLogFile = initializeWriterInRolling(
+ // In rolling log aggregation we need special initialization
+ // done in initializeWriterInRolling.
+ initializationResult = initializeWriterInRolling(
remoteLogFile, appId, nodeId);
+ aggregatedLogFile = initializationResult.getLeft();
+ createdNew = initializationResult.getRight();
} else {
aggregatedLogFile = remoteLogFile;
fsDataOStream = fc.create(remoteLogFile,
@@ -195,22 +204,30 @@ public class LogAggregationIndexedFileController
}
fsDataOStream.write(uuid);
fsDataOStream.flush();
+ createdNew = true;
}
- long aggregatedLogFileLength = fc.getFileStatus(
- aggregatedLogFile).getLen();
- // append a simple character("\n") to move the writer cursor, so
- // we could get the correct position when we call
- // fsOutputStream.getStartPos()
- final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
- fsDataOStream.write(dummyBytes);
- fsDataOStream.flush();
-
- if (fsDataOStream.getPos() >= (aggregatedLogFileLength
- + dummyBytes.length)) {
+ // If we have created a new file, we know that the offset is zero.
+ // Otherwise we should get this information through getFileStatus.
+ if (createdNew) {
currentOffSet = 0;
} else {
- currentOffSet = aggregatedLogFileLength;
+ long aggregatedLogFileLength = fc.getFileStatus(
+ aggregatedLogFile).getLen();
+ // append a simple character("\n") to move the writer cursor, so
+ // we could get the correct position when we call
+ // fsOutputStream.getStartPos()
+ final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
+ fsDataOStream.write(dummyBytes);
+ fsDataOStream.flush();
+
+ if (fsDataOStream.getPos() < (aggregatedLogFileLength
+ + dummyBytes.length)) {
+ currentOffSet = fc.getFileStatus(
+ aggregatedLogFile).getLen();
+ } else {
+ currentOffSet = 0;
+ }
}
return null;
}
@@ -220,8 +237,22 @@ public class LogAggregationIndexedFileController
}
}
- private Path initializeWriterInRolling(final Path remoteLogFile,
- final ApplicationId appId, final String nodeId) throws Exception {
+ /**
+ * Initializes the write for the log aggregation controller in the
+ * rolling case. It sets up / modifies checksum and meta files if needed.
+ *
+ * @param remoteLogFile the Path of the remote log file
+ * @param appId the application id
+ * @param nodeId the node id
+ * @return a Pair of Path and Boolean - the Path is path of the
+ * aggregated log file, while the Boolean is whether a new
+ * file was created or not
+ * @throws Exception
+ */
+ private Pair<Path, Boolean> initializeWriterInRolling(
+ final Path remoteLogFile, final ApplicationId appId,
+ final String nodeId) throws Exception {
+ boolean createdNew = false;
Path aggregatedLogFile = null;
// check uuid
// if we can not find uuid, we would load the uuid
@@ -281,6 +312,7 @@ public class LogAggregationIndexedFileController
// writes the uuid
fsDataOStream.write(uuid);
fsDataOStream.flush();
+ createdNew = true;
} else {
aggregatedLogFile = currentRemoteLogFile;
fsDataOStream = fc.create(currentRemoteLogFile,
@@ -289,8 +321,13 @@ public class LogAggregationIndexedFileController
}
// recreate checksum file if needed before aggregate the logs
if (overwriteCheckSum) {
- final long currentAggregatedLogFileLength = fc
- .getFileStatus(aggregatedLogFile).getLen();
+ long currentAggregatedLogFileLength;
+ if (createdNew) {
+ currentAggregatedLogFileLength = 0;
+ } else {
+ currentAggregatedLogFileLength = fc
+ .getFileStatus(aggregatedLogFile).getLen();
+ }
FSDataOutputStream checksumFileOutputStream = null;
try {
checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
@@ -307,7 +344,8 @@ public class LogAggregationIndexedFileController
IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
}
}
- return aggregatedLogFile;
+
+ return Pair.of(aggregatedLogFile, createdNew);
}
@Override
@@ -572,7 +610,6 @@ public class LogAggregationIndexedFileController
return findLogs;
}
- // TODO: fix me if the remote file system does not support append operation.
@Override
public List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException {
@@ -1144,7 +1181,8 @@ public class LogAggregationIndexedFileController
}
if (supportAppend) {
return 1024L * 1024 * 1024 * conf.getInt(
- LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10);
+ LOG_ROLL_OVER_MAX_FILE_SIZE_GB,
+ LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT);
} else {
return 0L;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org