You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/01/20 11:37:17 UTC

[hadoop] branch trunk updated: YARN-9525. IFile format is not working against s3a remote folder. Contributed by Adam Antal

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d52bbb  YARN-9525. IFile format is not working against s3a remote folder. Contributed by Adam Antal
6d52bbb is described below

commit 6d52bbbfcfd7750b7e547abdcd0d14632d6ed9b6
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Mon Jan 20 12:36:55 2020 +0100

    YARN-9525. IFile format is not working against s3a remote folder. Contributed by Adam Antal
---
 .../ifile/LogAggregationIndexedFileController.java | 78 ++++++++++++++++------
 1 file changed, 58 insertions(+), 20 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 7af58d7..605997f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -110,6 +111,7 @@ public class LogAggregationIndexedFileController
       "indexedFile.fs.retry-interval-ms";
   private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB =
       "indexedFile.log.roll-over.max-file-size-gb";
+  private static final int LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT = 10;
 
   @VisibleForTesting
   public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
@@ -182,9 +184,16 @@ public class LogAggregationIndexedFileController
             indexedLogsMeta.setCompressName(compressName);
           }
           Path aggregatedLogFile = null;
+          Pair<Path, Boolean> initializationResult = null;
+          boolean createdNew;
+
           if (context.isLogAggregationInRolling()) {
-            aggregatedLogFile = initializeWriterInRolling(
+            // In rolling log aggregation we need special initialization
+            // done in initializeWriterInRolling.
+            initializationResult = initializeWriterInRolling(
                 remoteLogFile, appId, nodeId);
+            aggregatedLogFile = initializationResult.getLeft();
+            createdNew = initializationResult.getRight();
           } else {
             aggregatedLogFile = remoteLogFile;
             fsDataOStream = fc.create(remoteLogFile,
@@ -195,22 +204,30 @@ public class LogAggregationIndexedFileController
             }
             fsDataOStream.write(uuid);
             fsDataOStream.flush();
+            createdNew = true;
           }
 
-          long aggregatedLogFileLength = fc.getFileStatus(
-              aggregatedLogFile).getLen();
-          // append a simple character("\n") to move the writer cursor, so
-          // we could get the correct position when we call
-          // fsOutputStream.getStartPos()
-          final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
-          fsDataOStream.write(dummyBytes);
-          fsDataOStream.flush();
-
-          if (fsDataOStream.getPos() >= (aggregatedLogFileLength
-              + dummyBytes.length)) {
+          // If we have created a new file, we know that the offset is zero.
+          // Otherwise we should get this information through getFileStatus.
+          if (createdNew) {
             currentOffSet = 0;
           } else {
-            currentOffSet = aggregatedLogFileLength;
+            long aggregatedLogFileLength = fc.getFileStatus(
+                aggregatedLogFile).getLen();
+            // append a simple character("\n") to move the writer cursor, so
+            // we could get the correct position when we call
+            // fsOutputStream.getStartPos()
+            final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
+            fsDataOStream.write(dummyBytes);
+            fsDataOStream.flush();
+
+            if (fsDataOStream.getPos() < (aggregatedLogFileLength
+                + dummyBytes.length)) {
+              currentOffSet = fc.getFileStatus(
+                      aggregatedLogFile).getLen();
+            } else {
+              currentOffSet = 0;
+            }
           }
           return null;
         }
@@ -220,8 +237,22 @@ public class LogAggregationIndexedFileController
     }
   }
 
-  private Path initializeWriterInRolling(final Path remoteLogFile,
-      final ApplicationId appId, final String nodeId) throws Exception {
+  /**
+   * Initializes the write for the log aggregation controller in the
+   * rolling case. It sets up / modifies checksum and meta files if needed.
+   *
+   * @param remoteLogFile the Path of the remote log file
+   * @param appId the application id
+   * @param nodeId the node id
+   * @return a Pair of Path and Boolean - the Path is path of the
+   *         aggregated log file, while the Boolean is whether a new
+   *         file was created or not
+   * @throws Exception
+   */
+  private Pair<Path, Boolean> initializeWriterInRolling(
+      final Path remoteLogFile, final ApplicationId appId,
+      final String nodeId) throws Exception {
+    boolean createdNew = false;
     Path aggregatedLogFile = null;
     // check uuid
     // if we can not find uuid, we would load the uuid
@@ -281,6 +312,7 @@ public class LogAggregationIndexedFileController
       // writes the uuid
       fsDataOStream.write(uuid);
       fsDataOStream.flush();
+      createdNew = true;
     } else {
       aggregatedLogFile = currentRemoteLogFile;
       fsDataOStream = fc.create(currentRemoteLogFile,
@@ -289,8 +321,13 @@ public class LogAggregationIndexedFileController
     }
     // recreate checksum file if needed before aggregate the logs
     if (overwriteCheckSum) {
-      final long currentAggregatedLogFileLength = fc
-          .getFileStatus(aggregatedLogFile).getLen();
+      long currentAggregatedLogFileLength;
+      if (createdNew) {
+        currentAggregatedLogFileLength = 0;
+      } else {
+        currentAggregatedLogFileLength = fc
+            .getFileStatus(aggregatedLogFile).getLen();
+      }
       FSDataOutputStream checksumFileOutputStream = null;
       try {
         checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
@@ -307,7 +344,8 @@ public class LogAggregationIndexedFileController
         IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
       }
     }
-    return aggregatedLogFile;
+
+    return Pair.of(aggregatedLogFile, createdNew);
   }
 
   @Override
@@ -572,7 +610,6 @@ public class LogAggregationIndexedFileController
     return findLogs;
   }
 
-  // TODO: fix me if the remote file system does not support append operation.
   @Override
   public List<ContainerLogMeta> readAggregatedLogsMeta(
       ContainerLogsRequest logRequest) throws IOException {
@@ -1144,7 +1181,8 @@ public class LogAggregationIndexedFileController
     }
     if (supportAppend) {
       return 1024L * 1024 * 1024 * conf.getInt(
-          LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10);
+          LOG_ROLL_OVER_MAX_FILE_SIZE_GB,
+          LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT);
     } else {
       return 0L;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org