You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/01/04 06:23:28 UTC

[GitHub] [hadoop] mehakmeet commented on a change in pull request #3736: HADOOP-18028. improve S3 read speed using prefetching & caching

mehakmeet commented on a change in pull request #3736:
URL: https://github.com/apache/hadoop/pull/3736#discussion_r772337743



##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/S3FileTest.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import static org.junit.Assert.*;

Review comment:
       not used. can be remove

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestObjectChangeDetectionAttributes.java
##########
@@ -234,6 +234,7 @@ private void assertContent(String file, Path path, byte[] content,
     s3Object.setObjectMetadata(metadata);
     s3Object.setObjectContent(new ByteArrayInputStream(content));
     when(s3.getObject(argThat(correctGetObjectRequest(file, eTag, versionId))))
+    // when(s3.getObject(any()))

Review comment:
       nit: seems accidental?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.FilePosition;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides an {@link InputStream} that allows reading from an S3 file.
+ */
+public abstract class S3InputStream
+    extends InputStream
+    implements CanSetReadahead, StreamCapabilities, IOStatisticsSource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
+
+  // The S3 file read by this instance.
+  private S3File s3File;
+
+  // Reading of S3 file takes place through this reader.
+  private S3Reader reader;
+
+  // Name of this stream. Used only for logging.
+  private final String name;
+
+  // Indicates whether the stream has been closed.
+  private volatile boolean closed;
+
+  // Current position within the file.
+  private FilePosition fpos;
+
+  // The target of the most recent seek operation.
+  private long seekTargetPos;
+
+  // Information about each block of the mapped S3 file.
+  private BlockData blockData;
+
+  // Read-specific operation context.
+  private S3AReadOpContext context;
+
+  // Attributes of the S3 object being read.
+  private S3ObjectAttributes s3Attributes;
+
+  // Callbacks used for interacting with the underlying S3 client.
+  private S3AInputStream.InputStreamCallbacks client;
+
+  // Used for reporting input stream access statistics.
+  private final S3AInputStreamStatistics streamStatistics;
+
+  private S3AInputPolicy inputPolicy;
+  private final ChangeTracker changeTracker;
+  private final IOStatistics ioStatistics;
+
+  /**
+   * Initializes a new instance of the {@code S3InputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3InputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+
+    Validate.checkNotNull(context, "context");
+    Validate.checkNotNull(s3Attributes, "s3Attributes");
+    Validate.checkNotNull(client, "client");
+
+    this.context = context;
+    this.s3Attributes = s3Attributes;
+    this.client = client;
+    this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
+    this.ioStatistics = streamStatistics.getIOStatistics();
+    this.name = S3File.getPath(s3Attributes);
+    this.changeTracker = new ChangeTracker(
+        this.name,
+        context.getChangeDetectionPolicy(),
+        this.streamStatistics.getChangeTrackerStatistics(),
+        s3Attributes);
+
+    setInputPolicy(context.getInputPolicy());
+    setReadahead(context.getReadahead());
+
+    long fileSize = s3Attributes.getLen();
+    int bufferSize = context.getPrefetchBlockSize();
+
+    this.blockData = new BlockData(fileSize, bufferSize);
+    this.fpos = new FilePosition(fileSize, bufferSize);
+    this.s3File = this.getS3File();
+    this.reader = new S3Reader(this.s3File);
+
+    this.seekTargetPos = 0;
+  }
+
+  /**
+   * Gets the internal IO statistics.
+   *
+   * @return the internal IO statistics.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return this.ioStatistics;
+  }
+
+  /**
+   * Access the input stream statistics.
+   * This is for internal testing and may be removed without warning.
+   * @return the statistics for this input stream
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public S3AInputStreamStatistics getS3AStreamStatistics() {
+    return this.streamStatistics;
+  }
+
+  /**
+   * Sets the number of bytes to read ahead each time.
+   *
+   * @param readahead the number of bytes to read ahead each time..
+   */
+  @Override
+  public synchronized void setReadahead(Long readahead) {
+    // We support read head by prefetching therefore we ignore the supplied value.
+    if (readahead != null) {
+      Validate.checkNotNegative(readahead, "readahead");
+    }
+  }
+
+  /**
+   * Indicates whether the given {@code capability} is supported by this stream.
+   *
+   * @param capability the capability to check.
+   * @return true if the given {@code capability} is supported by this stream, false otherwise.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    return capability.equalsIgnoreCase(StreamCapabilities.IOSTATISTICS)
+        || capability.equalsIgnoreCase(StreamCapabilities.READAHEAD);
+  }
+
+  /**
+   * Set/update the input policy of the stream.
+   * This updates the stream statistics.
+   * @param inputPolicy new input policy.
+   */
+  private void setInputPolicy(S3AInputPolicy inputPolicy) {
+    this.inputPolicy = inputPolicy;
+    streamStatistics.inputPolicySet(inputPolicy.ordinal());
+  }
+
+  /**
+   * Returns the number of bytes that can read from this stream without blocking.
+   */
+  @Override
+  public int available() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return 0;
+    }
+
+    return this.fpos.buffer().remaining();
+  }
+
+  /**
+   * Gets the current position.
+   *
+   * @return the current position.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  public long getPos() throws IOException {
+    this.throwIfClosed();
+
+    if (this.fpos.isValid()) {
+      return this.fpos.absolute();
+    } else {
+      return this.seekTargetPos;
+    }
+  }
+
+  /**
+   * Moves the current read position so that the next read will occur at {@code pos}.
+   *
+   * @param pos the absolute position to seek to.
+   * @throws IOException if there an error during this operation.
+   *
+   * @throws IllegalArgumentException if pos is outside of the range [0, file size].
+   */
+  public void seek(long pos) throws IOException {
+    this.throwIfClosed();
+    this.throwIfInvalidSeek(pos);
+
+    if (!this.fpos.setAbsolute(pos)) {
+      this.fpos.invalidate();
+      this.seekTargetPos = pos;
+    }
+  }
+
+  /**
+   * Ensures that a non-empty valid buffer is available for immediate reading.
+   * It returns true when at least one such buffer is available for reading.
+   * It returns false on reaching the end of the stream.
+   *
+   * @return true if at least one such buffer is available for reading, false otherwise.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  protected abstract boolean ensureCurrentBuffer() throws IOException;
+
+  @Override
+  public int read() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    this.incrementBytesRead(1);
+
+    return this.fpos.buffer().get() & 0xff;
+  }
+
+  /**
+   * Reads bytes from this stream and copies them into
+   * the given {@code buffer} starting at the beginning (offset 0).
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer) throws IOException {
+    return this.read(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Reads up to {@code len} bytes from this stream and copies them into
+   * the given {@code buffer} starting at the given {@code offset}.
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @param offset data is copied starting at this offset.
+   * @param len max number of bytes to copy.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int len) throws IOException {
+    this.throwIfClosed();
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    int numBytesRead = 0;
+    int numBytesRemaining = len;
+
+    while (numBytesRemaining > 0) {
+      if (!ensureCurrentBuffer()) {
+        break;
+      }
+
+      ByteBuffer buf = this.fpos.buffer();
+      int bytesToRead = Math.min(numBytesRemaining, buf.remaining());
+      buf.get(buffer, offset, bytesToRead);
+      this.incrementBytesRead(bytesToRead);
+      offset += bytesToRead;
+      numBytesRemaining -= bytesToRead;
+      numBytesRead += bytesToRead;
+    }
+
+    return numBytesRead;
+  }
+
+  protected S3File getFile() {
+    return this.s3File;
+  }
+
+  protected S3Reader getReader() {
+    return this.reader;
+  }
+
+  protected S3ObjectAttributes getS3ObjectAttributes() {
+    return this.s3Attributes;
+  }
+
+  protected FilePosition getFilePosition() {
+    return this.fpos;
+  }
+
+  protected String getName() {
+    return this.name;
+  }
+
+  protected boolean isClosed() {
+    return this.closed;
+  }
+
+  protected long getSeekTargetPos() {
+    return this.seekTargetPos;
+  }
+
+  protected void setSeekTargetPos(long pos) {
+    this.seekTargetPos = pos;
+  }
+
+  protected BlockData getBlockData() {
+    return this.blockData;
+  }
+
+  protected S3AReadOpContext getContext() {
+    return this.context;
+  }
+
+  private void incrementBytesRead(int bytesRead) {
+    if (bytesRead > 0) {
+      this.streamStatistics.bytesRead(bytesRead);
+      if (this.getContext().getStats() != null) {
+        this.getContext().getStats().incrementBytesRead(bytesRead);
+      }
+      this.fpos.incrementBytesRead(bytesRead);
+    }
+  }
+
+  // @VisibleForTesting
+  protected S3File getS3File() {
+    return new S3File(
+        this.context,
+        this.s3Attributes,
+        this.client,
+        this.streamStatistics,
+        this.changeTracker
+    );
+  }
+
+  protected String getOffsetStr(long offset) {
+    int blockNumber = -1;
+
+    if (this.blockData.isValidOffset(offset)) {
+      blockNumber = this.blockData.getBlockNumber(offset);
+    }
+
+    return String.format("%d:%d", blockNumber, offset);
+  }
+
+  /**
+   * Closes this stream and releases all acquired resources.
+   *
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public void close() throws IOException {
+    if (this.closed) {
+      return;
+    }
+    this.closed = true;
+
+    this.blockData = null;
+    this.s3File = null;
+    this.fpos.invalidate();
+    try {
+      this.client.close();
+    } finally {
+      this.streamStatistics.close();
+    }
+    this.client = null;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  protected void throwIfClosed() throws IOException {
+    if (this.closed) {
+      throw new IOException(this.name + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+
+  protected void throwIfInvalidSeek(long pos) throws EOFException {
+    long fileSize = this.s3File.size();
+    if (pos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
+    } else {
+      if (fileSize == 0 && pos == 0) {
+        // Do nothing. Valid combination.
+        return;
+      }
+
+      if (pos >= fileSize) {

Review comment:
       Should this be `pos > fileSize`?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BlockManager;
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides an {@code InputStream} that allows reading from an S3 file.
+ * Prefetched blocks are cached to local disk if a seek away from the
+ * current block is issued.
+ */
+public class S3CachingInputStream extends S3InputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(S3CachingInputStream.class);
+
+  // Number of blocks queued for prefching.
+  private int numBlocksToPrefetch;
+
+  private BlockManager blockManager;
+
+  /**
+   * Initializes a new instance of the {@code S3CachingInputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3CachingInputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+    super(context, s3Attributes, client);
+
+    this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
+    int bufferPoolSize = this.numBlocksToPrefetch + 1;
+    this.blockManager = this.createBlockManager(
+        this.getContext().getFuturePool(),
+        this.getReader(),
+        this.getBlockData(),
+        bufferPoolSize);
+    int fileSize = (int) s3Attributes.getLen();
+    LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize);
+  }
+
+  /**
+   * Moves the current read position so that the next read will occur at {@code pos}.
+   *
+   * @param pos the next read will take place at this position.
+   *
+   * @throws IllegalArgumentException if pos is outside of the range [0, file size].
+   */
+  @Override
+  public void seek(long pos) throws IOException {
+    this.throwIfClosed();
+    this.throwIfInvalidSeek(pos);
+
+    if (!this.getFilePosition().setAbsolute(pos)) {

Review comment:
       Can you add some comments in this section to explain what's happening here?

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestObjectChangeDetectionAttributes.java
##########
@@ -250,20 +251,41 @@ private void assertVersionAttributes(Path path, String eTag, String versionId)
     assertEquals(versionId, fileStatus.getVersionId());
   }
 
+  static String toString(GetObjectRequest req) {

Review comment:
       This method is not used anywhere? Were these changes meant for local testing maybe?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.hadoop.fs.common.Io;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Ecapsulates low level interactions with S3 object on AWS.

Review comment:
       nit: typo "Encapsulates".

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/S3FileTest.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import static org.junit.Assert.*;
+
+import com.twitter.util.ExecutorServiceFuturePool;
+import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class S3FileTest {
+  private final ExecutorService threadPool = Executors.newFixedThreadPool(1);
+  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final S3AInputStream.InputStreamCallbacks client = TestS3File.createClient("bucket");
+
+  @Test
+  public void testArgChecks() {
+    S3AReadOpContext readContext = Fakes.createReadContext(futurePool, "key", 10, 10, 1);
+    S3ObjectAttributes attrs = Fakes.createObjectAttributes("bucket", "key", 10);
+    S3AInputStreamStatistics stats =
+        readContext.getS3AStatisticsContext().newInputStreamStatistics();
+    ChangeTracker changeTracker = Fakes.createChangeTracker("bucket", "key", 10);
+
+    // Should not throw.
+    new S3File(readContext, attrs, client, stats, changeTracker);
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'context' must not be null",
+        () -> new S3File(null, attrs, client, stats, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'s3Attributes' must not be null",
+        () -> new S3File(readContext, null, client, stats, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'client' must not be null",
+        () -> new S3File(readContext, attrs, null, stats, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'streamStatistics' must not be null",
+        () -> new S3File(readContext, attrs, client, null, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'changeTracker' must not be null",
+        () -> new S3File(readContext, attrs, client, stats, null));
+  }
+
+  @Test
+  public void testProperties() throws IOException {

Review comment:
       nit: Should this be removed?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BlockManager;
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides an {@code InputStream} that allows reading from an S3 file.
+ * Prefetched blocks are cached to local disk if a seek away from the
+ * current block is issued.
+ */
+public class S3CachingInputStream extends S3InputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(S3CachingInputStream.class);
+
+  // Number of blocks queued for prefching.
+  private int numBlocksToPrefetch;
+
+  private BlockManager blockManager;
+
+  /**
+   * Initializes a new instance of the {@code S3CachingInputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3CachingInputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+    super(context, s3Attributes, client);
+
+    this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();

Review comment:
       nit: can use `context`, in place of `this.getContext()`?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BlockManager;
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides an {@code InputStream} that allows reading from an S3 file.
+ * Prefetched blocks are cached to local disk if a seek away from the
+ * current block is issued.
+ */
+public class S3CachingInputStream extends S3InputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(S3CachingInputStream.class);
+
+  // Number of blocks queued for prefching.
+  private int numBlocksToPrefetch;
+
+  private BlockManager blockManager;
+
+  /**
+   * Initializes a new instance of the {@code S3CachingInputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3CachingInputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+    super(context, s3Attributes, client);
+
+    this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
+    int bufferPoolSize = this.numBlocksToPrefetch + 1;
+    this.blockManager = this.createBlockManager(
+        this.getContext().getFuturePool(),

Review comment:
       same nit as above.

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BlockManager;
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides an {@code InputStream} that allows reading from an S3 file.
+ * Prefetched blocks are cached to local disk if a seek away from the
+ * current block is issued.
+ */
+public class S3CachingInputStream extends S3InputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(S3CachingInputStream.class);
+
+  // Number of blocks queued for prefching.

Review comment:
       nit: typo, "prefetching".

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides functionality to read S3 file one block at a time.
+ */
+public class S3Reader implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(S3Reader.class);
+
+  // The S3 file to read.
+  private final S3File s3File;
+
+  // Set to true by close().
+  private volatile boolean closed;
+
+  /**
+   * Constructs an instance of {@link S3Reader}.
+   *
+   * @param s3File The S3 file to read.
+   *
+   * @throws IllegalArgumentException if s3File is null.
+   */
+  public S3Reader(S3File s3File) {
+    Validate.checkNotNull(s3File, "s3File");
+
+    this.s3File = s3File;
+  }
+
+  /**
+   * Stars reading at {@code offset} and reads upto {@code size} bytes into {@code buffer}.
+   *
+   * @param buffer the buffer into which data is returned
+   * @param offset the absolute offset into the underlying file where reading starts.
+   * @param size the number of bytes to be read.
+   *
+   * @return number of bytes actually read.
+   * @throws IOException if there is an error reading from the file.
+   *
+   * @throws IllegalArgumentException if buffer is null.
+   * @throws IllegalArgumentException if offset is outside of the range [0, file size].
+   * @throws IllegalArgumentException if size is zero or negative.
+   */
+  public int read(ByteBuffer buffer, long offset, int size) throws IOException {
+    Validate.checkNotNull(buffer, "buffer");
+    Validate.checkWithinRange(offset, "offset", 0, this.s3File.size());
+    Validate.checkPositiveInteger(size, "size");
+
+    if (this.closed) {
+      return -1;
+    }
+
+    int reqSize = (int) Math.min(size, this.s3File.size() - offset);
+    return readOneBlockWithRetries(buffer, offset, reqSize);
+  }
+
+  @Override
+  public void close() {
+    this.closed = true;
+  }
+
+  private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
+      throws IOException {
+
+    this.s3File.getStatistics().readOperationStarted(offset, size);
+    Invoker invoker = this.s3File.getReadInvoker();
+
+    invoker.retry(
+        "read", this.s3File.getPath(), true,
+        () -> {
+          try {
+            this.readOneBlock(buffer, offset, size);
+          } catch (EOFException e) {
+            // the base implementation swallows EOFs.
+            return -1;
+          } catch (SocketTimeoutException e) {
+            this.s3File.getStatistics().readException();
+            throw e;
+          } catch (IOException e) {
+            this.s3File.getStatistics().readException();
+            throw e;
+          }
+          return 0;
+        });
+
+    int numBytesRead = buffer.position();
+    buffer.limit(numBytesRead);
+    this.s3File.getStatistics().readOperationCompleted(size, numBytesRead);
+    return numBytesRead;
+  }
+
+  private static final int READ_BUFFER_SIZE = 64 * 1024;

Review comment:
       declare this at the start of the class

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.FilePosition;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides an {@link InputStream} that allows reading from an S3 file.
+ */
+public abstract class S3InputStream
+    extends InputStream
+    implements CanSetReadahead, StreamCapabilities, IOStatisticsSource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
+
+  // The S3 file read by this instance.
+  private S3File s3File;
+
+  // Reading of S3 file takes place through this reader.
+  private S3Reader reader;
+
+  // Name of this stream. Used only for logging.
+  private final String name;
+
+  // Indicates whether the stream has been closed.
+  private volatile boolean closed;
+
+  // Current position within the file.
+  private FilePosition fpos;
+
+  // The target of the most recent seek operation.
+  private long seekTargetPos;
+
+  // Information about each block of the mapped S3 file.
+  private BlockData blockData;
+
+  // Read-specific operation context.
+  private S3AReadOpContext context;
+
+  // Attributes of the S3 object being read.
+  private S3ObjectAttributes s3Attributes;
+
+  // Callbacks used for interacting with the underlying S3 client.
+  private S3AInputStream.InputStreamCallbacks client;
+
+  // Used for reporting input stream access statistics.
+  private final S3AInputStreamStatistics streamStatistics;
+
+  private S3AInputPolicy inputPolicy;
+  private final ChangeTracker changeTracker;
+  private final IOStatistics ioStatistics;
+
+  /**
+   * Initializes a new instance of the {@code S3InputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3InputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+
+    Validate.checkNotNull(context, "context");
+    Validate.checkNotNull(s3Attributes, "s3Attributes");
+    Validate.checkNotNull(client, "client");
+
+    this.context = context;
+    this.s3Attributes = s3Attributes;
+    this.client = client;
+    this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
+    this.ioStatistics = streamStatistics.getIOStatistics();
+    this.name = S3File.getPath(s3Attributes);
+    this.changeTracker = new ChangeTracker(
+        this.name,
+        context.getChangeDetectionPolicy(),
+        this.streamStatistics.getChangeTrackerStatistics(),
+        s3Attributes);
+
+    setInputPolicy(context.getInputPolicy());
+    setReadahead(context.getReadahead());
+
+    long fileSize = s3Attributes.getLen();
+    int bufferSize = context.getPrefetchBlockSize();
+
+    this.blockData = new BlockData(fileSize, bufferSize);
+    this.fpos = new FilePosition(fileSize, bufferSize);
+    this.s3File = this.getS3File();
+    this.reader = new S3Reader(this.s3File);
+
+    this.seekTargetPos = 0;
+  }
+
+  /**
+   * Gets the internal IO statistics.
+   *
+   * @return the internal IO statistics.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return this.ioStatistics;
+  }
+
+  /**
+   * Access the input stream statistics.
+   * This is for internal testing and may be removed without warning.
+   * @return the statistics for this input stream
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public S3AInputStreamStatistics getS3AStreamStatistics() {
+    return this.streamStatistics;
+  }
+
+  /**
+   * Sets the number of bytes to read ahead each time.
+   *
+   * @param readahead the number of bytes to read ahead each time..
+   */
+  @Override
+  public synchronized void setReadahead(Long readahead) {
+    // We support read head by prefetching therefore we ignore the supplied value.
+    if (readahead != null) {
+      Validate.checkNotNegative(readahead, "readahead");
+    }
+  }
+
+  /**
+   * Indicates whether the given {@code capability} is supported by this stream.
+   *
+   * @param capability the capability to check.
+   * @return true if the given {@code capability} is supported by this stream, false otherwise.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    return capability.equalsIgnoreCase(StreamCapabilities.IOSTATISTICS)
+        || capability.equalsIgnoreCase(StreamCapabilities.READAHEAD);
+  }
+
+  /**
+   * Set/update the input policy of the stream.
+   * This updates the stream statistics.
+   * @param inputPolicy new input policy.
+   */
+  private void setInputPolicy(S3AInputPolicy inputPolicy) {
+    this.inputPolicy = inputPolicy;
+    streamStatistics.inputPolicySet(inputPolicy.ordinal());
+  }
+
+  /**
+   * Returns the number of bytes that can read from this stream without blocking.
+   */
+  @Override
+  public int available() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return 0;
+    }
+
+    return this.fpos.buffer().remaining();
+  }
+
+  /**
+   * Gets the current position.
+   *
+   * @return the current position.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  public long getPos() throws IOException {
+    this.throwIfClosed();
+
+    if (this.fpos.isValid()) {
+      return this.fpos.absolute();
+    } else {
+      return this.seekTargetPos;
+    }
+  }
+
+  /**
+   * Moves the current read position so that the next read will occur at {@code pos}.
+   *
+   * @param pos the absolute position to seek to.
+   * @throws IOException if there an error during this operation.
+   *
+   * @throws IllegalArgumentException if pos is outside of the range [0, file size].
+   */
+  public void seek(long pos) throws IOException {
+    this.throwIfClosed();
+    this.throwIfInvalidSeek(pos);
+
+    if (!this.fpos.setAbsolute(pos)) {
+      this.fpos.invalidate();
+      this.seekTargetPos = pos;
+    }
+  }
+
+  /**
+   * Ensures that a non-empty valid buffer is available for immediate reading.
+   * It returns true when at least one such buffer is available for reading.
+   * It returns false on reaching the end of the stream.
+   *
+   * @return true if at least one such buffer is available for reading, false otherwise.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  protected abstract boolean ensureCurrentBuffer() throws IOException;
+
+  @Override
+  public int read() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    this.incrementBytesRead(1);
+
+    return this.fpos.buffer().get() & 0xff;
+  }
+
+  /**
+   * Reads bytes from this stream and copies them into
+   * the given {@code buffer} starting at the beginning (offset 0).
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer) throws IOException {
+    return this.read(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Reads up to {@code len} bytes from this stream and copies them into
+   * the given {@code buffer} starting at the given {@code offset}.
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @param offset data is copied starting at this offset.
+   * @param len max number of bytes to copy.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int len) throws IOException {
+    this.throwIfClosed();
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    int numBytesRead = 0;
+    int numBytesRemaining = len;
+
+    while (numBytesRemaining > 0) {
+      if (!ensureCurrentBuffer()) {
+        break;
+      }
+
+      ByteBuffer buf = this.fpos.buffer();
+      int bytesToRead = Math.min(numBytesRemaining, buf.remaining());
+      buf.get(buffer, offset, bytesToRead);
+      this.incrementBytesRead(bytesToRead);
+      offset += bytesToRead;
+      numBytesRemaining -= bytesToRead;
+      numBytesRead += bytesToRead;
+    }
+
+    return numBytesRead;
+  }
+
+  protected S3File getFile() {
+    return this.s3File;
+  }
+
+  protected S3Reader getReader() {
+    return this.reader;
+  }
+
+  protected S3ObjectAttributes getS3ObjectAttributes() {
+    return this.s3Attributes;
+  }
+
+  protected FilePosition getFilePosition() {
+    return this.fpos;
+  }
+
+  protected String getName() {
+    return this.name;
+  }
+
+  protected boolean isClosed() {
+    return this.closed;
+  }
+
+  protected long getSeekTargetPos() {
+    return this.seekTargetPos;
+  }
+
+  protected void setSeekTargetPos(long pos) {
+    this.seekTargetPos = pos;
+  }
+
+  protected BlockData getBlockData() {
+    return this.blockData;
+  }
+
+  protected S3AReadOpContext getContext() {
+    return this.context;
+  }
+
+  private void incrementBytesRead(int bytesRead) {
+    if (bytesRead > 0) {
+      this.streamStatistics.bytesRead(bytesRead);
+      if (this.getContext().getStats() != null) {
+        this.getContext().getStats().incrementBytesRead(bytesRead);
+      }
+      this.fpos.incrementBytesRead(bytesRead);
+    }
+  }
+
+  // @VisibleForTesting
+  protected S3File getS3File() {
+    return new S3File(
+        this.context,
+        this.s3Attributes,
+        this.client,
+        this.streamStatistics,
+        this.changeTracker
+    );
+  }
+
+  protected String getOffsetStr(long offset) {
+    int blockNumber = -1;
+
+    if (this.blockData.isValidOffset(offset)) {
+      blockNumber = this.blockData.getBlockNumber(offset);
+    }
+
+    return String.format("%d:%d", blockNumber, offset);
+  }
+
+  /**
+   * Closes this stream and releases all acquired resources.
+   *
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public void close() throws IOException {

Review comment:
       S3Reader instance to be closed as well?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.FilePosition;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides an {@link InputStream} that allows reading from an S3 file.
+ */
+public abstract class S3InputStream
+    extends InputStream
+    implements CanSetReadahead, StreamCapabilities, IOStatisticsSource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
+
+  // The S3 file read by this instance.
+  private S3File s3File;
+
+  // Reading of S3 file takes place through this reader.
+  private S3Reader reader;
+
+  // Name of this stream. Used only for logging.
+  private final String name;
+
+  // Indicates whether the stream has been closed.
+  private volatile boolean closed;
+
+  // Current position within the file.
+  private FilePosition fpos;
+
+  // The target of the most recent seek operation.
+  private long seekTargetPos;
+
+  // Information about each block of the mapped S3 file.
+  private BlockData blockData;
+
+  // Read-specific operation context.
+  private S3AReadOpContext context;
+
+  // Attributes of the S3 object being read.
+  private S3ObjectAttributes s3Attributes;
+
+  // Callbacks used for interacting with the underlying S3 client.
+  private S3AInputStream.InputStreamCallbacks client;
+
+  // Used for reporting input stream access statistics.
+  private final S3AInputStreamStatistics streamStatistics;
+
+  private S3AInputPolicy inputPolicy;
+  private final ChangeTracker changeTracker;
+  private final IOStatistics ioStatistics;
+
+  /**
+   * Initializes a new instance of the {@code S3InputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3InputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+
+    Validate.checkNotNull(context, "context");
+    Validate.checkNotNull(s3Attributes, "s3Attributes");
+    Validate.checkNotNull(client, "client");
+
+    this.context = context;
+    this.s3Attributes = s3Attributes;
+    this.client = client;
+    this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
+    this.ioStatistics = streamStatistics.getIOStatistics();
+    this.name = S3File.getPath(s3Attributes);
+    this.changeTracker = new ChangeTracker(
+        this.name,
+        context.getChangeDetectionPolicy(),
+        this.streamStatistics.getChangeTrackerStatistics(),
+        s3Attributes);
+
+    setInputPolicy(context.getInputPolicy());
+    setReadahead(context.getReadahead());
+
+    long fileSize = s3Attributes.getLen();
+    int bufferSize = context.getPrefetchBlockSize();
+
+    this.blockData = new BlockData(fileSize, bufferSize);
+    this.fpos = new FilePosition(fileSize, bufferSize);
+    this.s3File = this.getS3File();
+    this.reader = new S3Reader(this.s3File);
+
+    this.seekTargetPos = 0;
+  }
+
+  /**
+   * Gets the internal IO statistics.
+   *
+   * @return the internal IO statistics.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return this.ioStatistics;
+  }
+
+  /**
+   * Access the input stream statistics.
+   * This is for internal testing and may be removed without warning.
+   * @return the statistics for this input stream
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public S3AInputStreamStatistics getS3AStreamStatistics() {
+    return this.streamStatistics;
+  }
+
+  /**
+   * Sets the number of bytes to read ahead each time.
+   *
+   * @param readahead the number of bytes to read ahead each time..
+   */
+  @Override
+  public synchronized void setReadahead(Long readahead) {
+    // We support read head by prefetching therefore we ignore the supplied value.
+    if (readahead != null) {
+      Validate.checkNotNegative(readahead, "readahead");
+    }
+  }
+
+  /**
+   * Indicates whether the given {@code capability} is supported by this stream.
+   *
+   * @param capability the capability to check.
+   * @return true if the given {@code capability} is supported by this stream, false otherwise.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    return capability.equalsIgnoreCase(StreamCapabilities.IOSTATISTICS)
+        || capability.equalsIgnoreCase(StreamCapabilities.READAHEAD);
+  }
+
+  /**
+   * Set/update the input policy of the stream.
+   * This updates the stream statistics.
+   * @param inputPolicy new input policy.
+   */
+  private void setInputPolicy(S3AInputPolicy inputPolicy) {
+    this.inputPolicy = inputPolicy;
+    streamStatistics.inputPolicySet(inputPolicy.ordinal());
+  }
+
+  /**
+   * Returns the number of bytes that can read from this stream without blocking.
+   */
+  @Override
+  public int available() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return 0;
+    }
+
+    return this.fpos.buffer().remaining();
+  }
+
+  /**
+   * Gets the current position.
+   *
+   * @return the current position.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  public long getPos() throws IOException {
+    this.throwIfClosed();
+
+    if (this.fpos.isValid()) {
+      return this.fpos.absolute();
+    } else {
+      return this.seekTargetPos;
+    }
+  }
+
+  /**
+   * Moves the current read position so that the next read will occur at {@code pos}.
+   *
+   * @param pos the absolute position to seek to.
+   * @throws IOException if there an error during this operation.
+   *
+   * @throws IllegalArgumentException if pos is outside of the range [0, file size].
+   */
+  public void seek(long pos) throws IOException {
+    this.throwIfClosed();
+    this.throwIfInvalidSeek(pos);
+
+    if (!this.fpos.setAbsolute(pos)) {
+      this.fpos.invalidate();
+      this.seekTargetPos = pos;
+    }
+  }
+
+  /**
+   * Ensures that a non-empty valid buffer is available for immediate reading.
+   * It returns true when at least one such buffer is available for reading.
+   * It returns false on reaching the end of the stream.
+   *
+   * @return true if at least one such buffer is available for reading, false otherwise.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  protected abstract boolean ensureCurrentBuffer() throws IOException;
+
+  @Override
+  public int read() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    this.incrementBytesRead(1);
+
+    return this.fpos.buffer().get() & 0xff;
+  }
+
+  /**
+   * Reads bytes from this stream and copies them into
+   * the given {@code buffer} starting at the beginning (offset 0).
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer) throws IOException {
+    return this.read(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Reads up to {@code len} bytes from this stream and copies them into
+   * the given {@code buffer} starting at the given {@code offset}.
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @param offset data is copied starting at this offset.
+   * @param len max number of bytes to copy.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int len) throws IOException {
+    this.throwIfClosed();
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    int numBytesRead = 0;
+    int numBytesRemaining = len;
+
+    while (numBytesRemaining > 0) {
+      if (!ensureCurrentBuffer()) {
+        break;
+      }
+
+      ByteBuffer buf = this.fpos.buffer();
+      int bytesToRead = Math.min(numBytesRemaining, buf.remaining());
+      buf.get(buffer, offset, bytesToRead);
+      this.incrementBytesRead(bytesToRead);
+      offset += bytesToRead;
+      numBytesRemaining -= bytesToRead;
+      numBytesRead += bytesToRead;
+    }
+
+    return numBytesRead;
+  }
+
+  protected S3File getFile() {
+    return this.s3File;
+  }
+
+  protected S3Reader getReader() {
+    return this.reader;
+  }
+
+  protected S3ObjectAttributes getS3ObjectAttributes() {
+    return this.s3Attributes;
+  }
+
+  protected FilePosition getFilePosition() {
+    return this.fpos;
+  }
+
+  protected String getName() {
+    return this.name;
+  }
+
+  protected boolean isClosed() {
+    return this.closed;
+  }
+
+  protected long getSeekTargetPos() {
+    return this.seekTargetPos;
+  }
+
+  protected void setSeekTargetPos(long pos) {
+    this.seekTargetPos = pos;
+  }
+
+  protected BlockData getBlockData() {
+    return this.blockData;
+  }
+
+  protected S3AReadOpContext getContext() {
+    return this.context;
+  }
+
+  private void incrementBytesRead(int bytesRead) {
+    if (bytesRead > 0) {
+      this.streamStatistics.bytesRead(bytesRead);
+      if (this.getContext().getStats() != null) {
+        this.getContext().getStats().incrementBytesRead(bytesRead);
+      }
+      this.fpos.incrementBytesRead(bytesRead);
+    }
+  }
+
+  // @VisibleForTesting

Review comment:
       remove this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org