You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/06/09 12:02:05 UTC

[GitHub] [hadoop] steveloughran commented on a diff in pull request #4386: HADOOP-18231. Fixes failing tests & drain stream async.

steveloughran commented on code in PR #4386:
URL: https://github.com/apache/hadoop/pull/4386#discussion_r893241456


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -98,6 +107,7 @@ public S3File(
     this.streamStatistics = streamStatistics;
     this.changeTracker = changeTracker;
     this.s3Objects = new IdentityHashMap<InputStream, S3Object>();

Review Comment:
   while you are there, can you change to a simple <> here 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = _1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+
+  private void openFS() throws IOException {

Review Comment:
    for better isolation between tests, we need that FS to *not* be cached. otherwise, if a test in the same JVM has already accessed this path, you may get its FS with its settings.
   
   use `FileSystem.createFileSystem()`, override `teardown()` and invoke `cleanupWithLogger(LOG, largeFileFS)` to close it if it is non null



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
       this.s3Objects.remove(inputStream);
     }
 
+    if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+      // don't bother with async io.
+      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream));
+    }
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object;
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(final boolean shouldAbort, final String reason, final long remaining,
+      final S3Object requestObject, final InputStream inputStream) {
+
+    try {
+      return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(boolean shouldAbort, final String reason,
+      final long remaining, final S3Object requestObject, final InputStream inputStream) {
+
+    if (!shouldAbort && remaining > 0) {
+      try {
+        long drained = 0;
+        byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+        while (true) {
+          final int count = inputStream.read(buffer);
+          if (count < 0) {
+            // no more data is left
+            break;
+          }
+          drained += count;
+        }
+        LOG.debug("Drained stream of {} bytes", drained);
+      } catch (Exception e) {
+        // exception escalates to an abort
+        LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e);
+        shouldAbort = true;
+      }
+    }
     Io.closeIgnoringIoException(inputStream);
-    Io.closeIgnoringIoException(obj);
+    Io.closeIgnoringIoException(requestObject);

Review Comment:
   I'm not sure which library this is. switch to our `IOUtils.cleanupWithLogger(lOG, inputStream, requestObject)`
   
   this will log any exceptions at debug, if ever needed



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -193,18 +203,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
     return stream;
   }
 
-  /**
-   * Closes this stream and releases all acquired resources.
-   */
-  @Override
-  public synchronized void close() {
-    List<InputStream> streams = new ArrayList<InputStream>(this.s3Objects.keySet());
-    for (InputStream stream : streams) {
-      this.close(stream);
-    }
-  }
-
-  void close(InputStream inputStream) {
+  void close(InputStream inputStream, int numRemainingBytes) {

Review Comment:
   can this be private?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;

Review Comment:
   I think hadoop common test should have a version of 
   org.apache.hadoop.fs.azure.integration.Sizes which we can reference here and elsewhere. it uses the S_ prefix to keep checkstyle quiet.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
       this.s3Objects.remove(inputStream);
     }
 
+    if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+      // don't bother with async io.
+      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream));
+    }
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object;
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(final boolean shouldAbort, final String reason, final long remaining,
+      final S3Object requestObject, final InputStream inputStream) {
+
+    try {
+      return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(boolean shouldAbort, final String reason,

Review Comment:
   style nit: can you split one to a line, all tagged final. there's enough params to justify the effort



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;

Review Comment:
   can you name this largeFileFS to make clear why it is being used



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = _1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+
+  private void openFS() throws IOException {
+    Configuration conf = getConfiguration();
+
+    largeFile = new Path(DEFAULT_CSVTEST_FILE);
+    blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
+    fs = largeFile.getFileSystem(getConfiguration());
+    FileStatus fileStatus = fs.getFileStatus(largeFile);
+    largeFileSize = fileStatus.getLen();
+    numBlocks = calculateNumBlocks(largeFileSize, blockSize);
+  }
+
+  private static int calculateNumBlocks(long largeFileSize, int blockSize) {
+    if (largeFileSize == 0) {
+      return 0;
+    } else {
+      return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize > 0 ? 1 : 0);
+    }
+  }
+
+  @Test
+  public void testReadLargeFileFully() throws Throwable {
+    describe("read a large file fully, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = fs.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[(int) largeFileSize];
+
+      in.read(buffer, 0, (int) largeFileSize);
+
+      verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
+    }
+  }
+
+  @Test
+  public void testRandomReadLargeFile() throws Throwable {
+    describe("random read on a large file, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = fs.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[blockSize];
+
+      // Don't read the block completely so it gets cached on seek
+      in.read(buffer, 0, blockSize - _1K * 10);
+      in.seek(blockSize + _1K * 10);
+      // Backwards seek, will use cached block
+      in.seek(_1K * 5);
+      in.read();
+
+      verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2);
+    }
+  }
+
+  @Test
+  public void testRandomReadSmallFile() throws Throwable {
+    describe("random read on a small file, uses S3InMemoryInputStream");
+
+    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+    Path smallFile = path("randomReadSmallFile");
+    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
+
+    try (FSDataInputStream in = getFileSystem().open(smallFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[SMALL_FILE_SIZE];
+
+      in.read(buffer, 0, _1K * 4);
+      in.seek(_1K * 12);
+      in.read(buffer, 0, _1K * 4);
+
+      verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
+    }
+
+  }
+}

Review Comment:
   nit, add a newline



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java:
##########
@@ -104,16 +105,18 @@
   public S3InputStream(
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
-      S3AInputStream.InputStreamCallbacks client) {
+      S3AInputStream.InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics) {
 
     Validate.checkNotNull(context, "context");
     Validate.checkNotNull(s3Attributes, "s3Attributes");
     Validate.checkNotNull(client, "client");
+    Validate.checkNotNull(streamStatistics, "streamStatistics");

Review Comment:
   this is one of those things we'd inevitably want to clean up. i can see a "steve does a cleanup" PR coming soon.... 
   If you use `Objects.requireNonNull()` you can add the check on L119 so no need for extra work...its where new code should be going, even if older code still uses the guava checkNotNull



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {

Review Comment:
   can you call this ITestS3PrefetchingInputStream? I want to rename the other classes too, but let's start with the new ones



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -64,6 +67,12 @@ public class S3File implements Closeable {
   // That allows us to close the object when closing the stream.
   private Map<InputStream, S3Object> s3Objects;
 
+  // uri of the object being read

Review Comment:
   nit: can you make these all javadocs, so they show up in IDEs.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = _1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+
+  private void openFS() throws IOException {
+    Configuration conf = getConfiguration();
+
+    largeFile = new Path(DEFAULT_CSVTEST_FILE);
+    blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
+    fs = largeFile.getFileSystem(getConfiguration());
+    FileStatus fileStatus = fs.getFileStatus(largeFile);
+    largeFileSize = fileStatus.getLen();
+    numBlocks = calculateNumBlocks(largeFileSize, blockSize);
+  }
+
+  private static int calculateNumBlocks(long largeFileSize, int blockSize) {
+    if (largeFileSize == 0) {
+      return 0;
+    } else {
+      return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize > 0 ? 1 : 0);
+    }
+  }
+
+  @Test
+  public void testReadLargeFileFully() throws Throwable {
+    describe("read a large file fully, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = fs.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[(int) largeFileSize];

Review Comment:
   this is trouble if that test file is really big. prefer to create a smaller buffer and iterate through with readFully calls.
   
   now, one thing we don't check reliably, even in AbstractSTestS3AHugeFiles, is *are the contents of large files constant even if you read them in different ways*?
   
   i think we need to worry about this, though also think AbstractSTestS3AHugeFiles might be the place...we can build an md5 checksum on the upload and verify on the reads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org