You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/04/27 18:25:01 UTC

[hadoop] 03/05: HADOOP-16202. Enhanced openFile(): mapreduce and YARN changes. (#2584/2)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e123de9f1903203519835870c1600ab0aa570db9
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Sun Apr 24 17:10:34 2022 +0100

    HADOOP-16202. Enhanced openFile(): mapreduce and YARN changes. (#2584/2)
    
    These changes ensure that sequential files are opened with the
    right read policy, and split start/end is passed in.
    
    As well as offering opportunities for filesystem clients to
    choose fetch/cache/seek policies, the settings ensure that
    processing text files on an s3 bucket where the default policy
    is "random" will still be processed efficiently.
    
    This commit depends on the associated hadoop-common patch,
    which must be committed first.
    
    Contributed by Steve Loughran.
    
    Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
---
 .../jobhistory/JobHistoryCopyService.java          | 10 ++++++++-
 .../org/apache/hadoop/mapred/LineRecordReader.java | 13 +++++++++---
 .../lib/input/FixedLengthRecordReader.java         |  7 ++++---
 .../mapreduce/lib/input/LineRecordReader.java      | 14 ++++++++++---
 .../mapreduce/lib/input/NLineInputFormat.java      |  6 +++---
 .../hadoop/examples/terasort/TeraInputFormat.java  | 16 +++++++++++++--
 .../tools/mapred/RetriableFileCopyCommand.java     | 12 +++++++++--
 .../streaming/mapreduce/StreamInputFormat.java     |  6 +++---
 .../yarn/logaggregation/AggregatedLogFormat.java   | 17 +++++++++++++--
 .../org/apache/hadoop/yarn/util/FSDownload.java    | 24 +++++++++++++++-------
 10 files changed, 96 insertions(+), 29 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
index ee4ec2c86a1..ecae4f2fc06 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
@@ -35,6 +35,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
 /**
  * Reads in history events from the JobHistoryFile and sends them out again
  * to be recorded.
@@ -118,7 +122,11 @@ public class JobHistoryCopyService extends CompositeService implements HistoryEv
         fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
           jobId, (applicationAttemptId.getAttemptId() - 1)));
     LOG.info("History file is at " + historyFile);
-    in = fc.open(historyFile);
+    in = awaitFuture(
+        fc.openFile(historyFile)
+            .opt(FS_OPTION_OPENFILE_READ_POLICY,
+                FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+            .build());
     return in;
   }
   
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index 1fcb118a100..5724e729310 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -41,9 +40,13 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
 import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
 import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
+import org.apache.hadoop.util.functional.FutureIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+
 /**
  * Treats keys as offset in file and value as line. 
  */
@@ -109,10 +112,14 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
     // open the file and seek to the start of the split
     final FutureDataInputStreamBuilder builder =
         file.getFileSystem(job).openFile(file);
-    FutureIOSupport.propagateOptions(builder, job,
+    // the start and end of the split may be used to build
+    // an input strategy.
+    builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
+        .opt(FS_OPTION_OPENFILE_SPLIT_END, end);
+    FutureIO.propagateOptions(builder, job,
         MRJobConfig.INPUT_FILE_OPTION_PREFIX,
         MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
-    fileIn = FutureIOSupport.awaitFuture(builder.build());
+    fileIn = FutureIO.awaitFuture(builder.build());
     if (isCompressedInput()) {
       decompressor = CodecPool.getDecompressor(codec);
       if (codec instanceof SplittableCompressionCodec) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
index c0ae9a5cdac..6969f61836f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -40,6 +39,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.functional.FutureIO;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,10 +95,10 @@ public class FixedLengthRecordReader
     // open the file
     final FutureDataInputStreamBuilder builder =
         file.getFileSystem(job).openFile(file);
-    FutureIOSupport.propagateOptions(builder, job,
+    FutureIO.propagateOptions(builder, job,
         MRJobConfig.INPUT_FILE_OPTION_PREFIX,
         MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
-    fileIn = FutureIOSupport.awaitFuture(builder.build());
+    fileIn = FutureIO.awaitFuture(builder.build());
 
     CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
     if (null != codec) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
index 160c7635658..617abaacae0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -40,9 +39,14 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.functional.FutureIO;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+
 /**
  * Treats keys as offset in file and value as line. 
  */
@@ -86,10 +90,14 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
     // open the file and seek to the start of the split
     final FutureDataInputStreamBuilder builder =
         file.getFileSystem(job).openFile(file);
-    FutureIOSupport.propagateOptions(builder, job,
+    // the start and end of the split may be used to build
+    // an input strategy.
+    builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
+    builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
+    FutureIO.propagateOptions(builder, job,
         MRJobConfig.INPUT_FILE_OPTION_PREFIX,
         MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
-    fileIn = FutureIOSupport.awaitFuture(builder.build());
+    fileIn = FutureIO.awaitFuture(builder.build());
     
     CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
     if (null!=codec) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
index dfff9ad0d2b..5161a96c345 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -39,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.functional.FutureIO;
 
 /**
  * NLineInputFormat which splits N lines of input as one split.
@@ -99,10 +99,10 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
     try {
       final FutureDataInputStreamBuilder builder =
           fileName.getFileSystem(conf).openFile(fileName);
-      FutureIOSupport.propagateOptions(builder, conf,
+      FutureIO.propagateOptions(builder, conf,
           MRJobConfig.INPUT_FILE_OPTION_PREFIX,
           MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
-      FSDataInputStream in  = FutureIOSupport.awaitFuture(builder.build());
+      FSDataInputStream in  = FutureIO.awaitFuture(builder.build());
       lr = new LineReader(in, conf);
       Text line = new Text();
       int numLines = 0;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
index 20ce8ef2b60..f284a9c3807 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,6 +42,12 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
 
 /**
  * An input format that reads the first 10 characters of each line as the key
@@ -224,12 +231,17 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
         throws IOException, InterruptedException {
       Path p = ((FileSplit)split).getPath();
       FileSystem fs = p.getFileSystem(context.getConfiguration());
-      in = fs.open(p);
       long start = ((FileSplit)split).getStart();
       // find the offset to start at a record boundary
       offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
-      in.seek(start + offset);
       length = ((FileSplit)split).getLength();
+      final FutureDataInputStreamBuilder builder = fs.openFile(p)
+          .opt(FS_OPTION_OPENFILE_SPLIT_START, start)
+          .opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
+          .opt(FS_OPTION_OPENFILE_READ_POLICY,
+              FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
+      in = FutureIO.awaitFuture(builder.build());
+      in.seek(start + offset);
     }
 
     public void close() throws IOException {
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 6404e856612..d6825f75d8c 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -45,7 +45,11 @@ import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.tools.util.RetriableCommand;
 import org.apache.hadoop.tools.util.ThrottledInputStream;
 
-import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
  * This class extends RetriableCommand to implement the copy of files,
@@ -328,7 +332,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
       FileSystem fs = path.getFileSystem(conf);
       float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
               DistCpConstants.DEFAULT_BANDWIDTH_MB);
-      FSDataInputStream in = fs.open(path);
+      // open with sequential read, but not whole-file
+      FSDataInputStream in = awaitFuture(fs.openFile(path)
+          .opt(FS_OPTION_OPENFILE_READ_POLICY,
+              FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+          .build());
       return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
     }
     catch (IOException e) {
diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java
index 77f4e041d5f..f44488c7c02 100644
--- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java
+++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 import org.apache.hadoop.streaming.StreamUtil;
+import org.apache.hadoop.util.functional.FutureIO;
 
 /**
  * An input format that selects a RecordReader based on a JobConf property. This
@@ -66,10 +66,10 @@ public class StreamInputFormat extends KeyValueTextInputFormat {
     FileSystem fs = path.getFileSystem(conf);
     // open the file
     final FutureDataInputStreamBuilder builder = fs.openFile(path);
-    FutureIOSupport.propagateOptions(builder, conf,
+    FutureIO.propagateOptions(builder, conf,
         MRJobConfig.INPUT_FILE_OPTION_PREFIX,
         MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
-    FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build());
+    FSDataInputStream in = FutureIO.awaitFuture(builder.build());
 
     // Factory dispatch based on available params..
     Class readerClass;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 0fa9764b7bb..0122b873aab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -77,6 +78,11 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
 import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
 @Public
 @Evolving
 public class AggregatedLogFormat {
@@ -576,9 +582,16 @@ public class AggregatedLogFormat {
       try {
         FileContext fileContext =
             FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
-        this.fsDataIStream = fileContext.open(remoteAppLogFile);
+        FileStatus status = fileContext.getFileStatus(remoteAppLogFile);
+        this.fsDataIStream = awaitFuture(
+            fileContext.openFile(remoteAppLogFile)
+                .opt(FS_OPTION_OPENFILE_READ_POLICY,
+                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .opt(FS_OPTION_OPENFILE_LENGTH,
+                    status.getLen())   // file length hint for object stores
+                .build());
         reader = new TFile.Reader(this.fsDataIStream,
-            fileContext.getFileStatus(remoteAppLogFile).getLen(), conf);
+            status.getLen(), conf);
         this.scanner = reader.createScanner();
       } catch (IOException ioe) {
         close();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index e5fb4175611..640cc82f539 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -60,7 +60,11 @@ import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
-/**
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
+ /**
  * Download a single URL to the local disk.
  *
  */
@@ -285,23 +289,25 @@ public class FSDownload implements Callable<Path> {
       }
     }
 
-    downloadAndUnpack(sCopy, destination);
+    downloadAndUnpack(sCopy, sStat, destination);
   }
 
   /**
    * Copy source path to destination with localization rules.
-   * @param source source path to copy. Typically HDFS
+   * @param source source path to copy. Typically HDFS or an object store.
+   * @param sourceStatus status of source
    * @param destination destination path. Typically local filesystem
    * @exception YarnException Any error has occurred
    */
-  private void downloadAndUnpack(Path source, Path destination)
+  private void downloadAndUnpack(Path source,
+      FileStatus sourceStatus,  Path destination)
       throws YarnException {
     try {
       FileSystem sourceFileSystem = source.getFileSystem(conf);
       FileSystem destinationFileSystem = destination.getFileSystem(conf);
-      if (sourceFileSystem.getFileStatus(source).isDirectory()) {
+      if (sourceStatus.isDirectory()) {
         FileUtil.copy(
-            sourceFileSystem, source,
+            sourceFileSystem, sourceStatus,
             destinationFileSystem, destination, false,
             true, conf);
       } else {
@@ -329,7 +335,11 @@ public class FSDownload implements Callable<Path> {
                       FileSystem sourceFileSystem,
                       FileSystem destinationFileSystem)
       throws IOException, InterruptedException, ExecutionException {
-    try (InputStream inputStream = sourceFileSystem.open(source)) {
+    try (InputStream inputStream = awaitFuture(
+        sourceFileSystem.openFile(source)
+            .opt(FS_OPTION_OPENFILE_READ_POLICY,
+                FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+            .build())) {
       File dst = new File(destination.toUri());
       String lowerDst = StringUtils.toLowerCase(dst.getName());
       switch (resource.getType()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org