You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/04/13 01:13:45 UTC

[hadoop] branch trunk updated: HADOOP-14747. S3AInputStream to implement CanUnbuffer.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2382f63  HADOOP-14747. S3AInputStream to implement CanUnbuffer.
2382f63 is described below

commit 2382f63fc0bb4108f3f3c542b4be7c04fbedd7c4
Author: Sahil Takiar <st...@cloudera.com>
AuthorDate: Fri Apr 12 18:09:14 2019 -0700

    HADOOP-14747. S3AInputStream to implement CanUnbuffer.
    
    Author:    Sahil Takiar <st...@cloudera.com>
---
 .../site/markdown/filesystem/fsdatainputstream.md  |  37 ++++++
 .../fs/contract/AbstractContractUnbufferTest.java  | 125 +++++++++++++++++++++
 .../apache/hadoop/fs/contract/ContractOptions.java |   5 +
 .../fs/contract/hdfs/TestHDFSContractUnbuffer.java |  46 ++++++++
 .../src/test/resources/contract/hdfs.xml           |   5 +
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |  39 +++++--
 .../fs/contract/s3a/ITestS3AContractUnbuffer.java  |  41 +++++++
 .../org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java |  66 +++++++++++
 .../org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java  |  76 +++++++++++++
 .../hadoop-aws/src/test/resources/contract/s3a.xml |   5 +
 10 files changed, 438 insertions(+), 7 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index e067b07..0906964 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -275,6 +275,43 @@ class, which can react to a checksum error in a read by attempting to source
 the data elsewhere. If a new source can be found it attempts to reread and
 recheck that portion of the file.
 
+### `CanUnbuffer.unbuffer()`
+
+This operation instructs the source to release any system resources they are
+currently holding on to, such as buffers, sockets, file descriptors, etc. Any
+subsequent IO operation will likely have to reacquire these resources.
+Unbuffering is useful in situation where streams need to remain open, but no IO
+operation is expected from the stream in the immediate future (examples include
+file handle cacheing).
+
+#### Preconditions
+
+Not all subclasses implement this operation. In addition to implementing
+`CanUnbuffer`. Subclasses must implement the `StreamCapabilities` interface and
+`StreamCapabilities.hasCapability(UNBUFFER)` must return true. If a subclass
+implements `CanUnbuffer` but does not report the functionality via
+`StreamCapabilities` then the call to `unbuffer` does nothing. If a subclass
+reports that it does implement `UNBUFFER`, but does not implement the
+`CanUnbuffer` interface, an `UnsupportedOperationException` is thrown.
+
+    supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
+
+This method is not thread-safe. If `unbuffer` is called while a `read` is in
+progress, the outcome is undefined.
+
+`unbuffer` can be called on a closed file, in which case `unbuffer` will do
+nothing.
+
+#### Postconditions
+
+The majority of subclasses that do not implement this operation simply
+do nothing.
+
+If the operation is supported, `unbuffer` releases any and all system resources
+associated with the stream. The exact list of what these resources are is
+generally implementation dependent, however, in general, it may include
+buffers, sockets, file descriptors, etc.
+
 ## <a name="PositionedReadable"></a> interface `PositionedReadable`
 
 The `PositionedReadable` operations supply "positioned reads" ("pread").
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
new file mode 100644
index 0000000..7ba32ba
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
+ */
+public abstract class AbstractContractUnbufferTest extends AbstractFSContractTestBase {
+
+  private Path file;
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfUnsupported(SUPPORTS_UNBUFFER);
+    file = path("unbufferFile");
+    createFile(getFileSystem(), file, true,
+            dataset(TEST_FILE_LEN, 0, 255));
+  }
+
+  @Test
+  public void testUnbufferAfterRead() throws IOException {
+    describe("unbuffer a file after a single read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferBeforeRead() throws IOException {
+    describe("unbuffer a file before a read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+    }
+  }
+
+  @Test
+  public void testUnbufferEmptyFile() throws IOException {
+    Path emptyFile = path("emptyUnbufferFile");
+    createFile(getFileSystem(), emptyFile, true,
+            dataset(TEST_FILE_LEN, 0, 255));
+    describe("unbuffer an empty file");
+    try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferOnClosedFile() throws IOException {
+    describe("unbuffer a file before a read");
+    FSDataInputStream stream = null;
+    try {
+      stream = getFileSystem().open(file);
+      assertEquals(128, stream.read(new byte[128]));
+    } finally {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    unbuffer(stream);
+  }
+
+  @Test
+  public void testMultipleUnbuffers() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+      unbuffer(stream);
+    }
+  }
+
+   @Test
+  public void testUnbufferMultipleReads() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      assertEquals(128, stream.read(new byte[128]));
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+    }
+  }
+
+  private void unbuffer(FSDataInputStream stream) throws IOException {
+    long pos = stream.getPos();
+    stream.unbuffer();
+    assertEquals(pos, stream.getPos());
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
index cca3d4c..91a1121 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
@@ -202,6 +202,11 @@ public interface ContractOptions {
   String SUPPORTS_CONTENT_CHECK = "supports-content-check";
 
   /**
+   * Indicates that FS supports unbuffer.
+   */
+  String SUPPORTS_UNBUFFER = "supports-unbuffer";
+
+  /**
    * Maximum path length
    * {@value}
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java
new file mode 100644
index 0000000..54b8bf1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class TestHDFSContractUnbuffer extends AbstractContractUnbufferTest {
+
+  @BeforeClass
+  public static void createCluster() throws IOException {
+    HDFSContract.createCluster();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws IOException {
+    HDFSContract.destroyCluster();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HDFSContract(conf);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
index 261d4ba..3c9fccc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
@@ -111,4 +111,9 @@
     <value>true</value>
   </property>
 
+  <property>
+    <name>fs.contract.supports-unbuffer</name>
+    <value>true</value>
+  </property>
+
 </configuration>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index d096601..cbe796b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -25,14 +25,17 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import com.amazonaws.services.s3.model.SSECustomerKey;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
 
 import org.slf4j.Logger;
@@ -43,6 +46,7 @@ import java.io.IOException;
 import java.net.SocketTimeoutException;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.util.StringUtils.toLowerCase;
 
 /**
  * The input stream for an S3A object.
@@ -63,7 +67,8 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class S3AInputStream extends FSInputStream implements CanSetReadahead {
+public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
+        CanUnbuffer, StreamCapabilities {
 
   public static final String E_NEGATIVE_READAHEAD_VALUE
       = "Negative readahead value";
@@ -175,7 +180,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   private synchronized void reopen(String reason, long targetPos, long length,
           boolean forceAbort) throws IOException {
 
-    if (wrappedStream != null) {
+    if (isObjectStreamOpen()) {
       closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
     }
 
@@ -542,7 +547,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   @Retries.OnceRaw
   private void closeStream(String reason, long length, boolean forceAbort) {
-    if (wrappedStream != null) {
+    if (isObjectStreamOpen()) {
 
       // if the amount of data remaining in the current request is greater
       // than the readahead value: abort.
@@ -605,12 +610,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   @InterfaceStability.Unstable
   public synchronized boolean resetConnection() throws IOException {
     checkNotClosed();
-    boolean connectionOpen = wrappedStream != null;
-    if (connectionOpen) {
+    if (isObjectStreamOpen()) {
       LOG.info("Forced reset of connection to {}", uri);
       closeStream("reset()", contentRangeFinish, true);
     }
-    return connectionOpen;
+    return isObjectStreamOpen();
   }
 
   @Override
@@ -677,7 +681,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
           "S3AInputStream{");
       sb.append(uri);
       sb.append(" wrappedStream=")
-          .append(wrappedStream != null ? "open" : "closed");
+          .append(isObjectStreamOpen() ? "open" : "closed");
       sb.append(" read policy=").append(inputPolicy);
       sb.append(" pos=").append(pos);
       sb.append(" nextReadPos=").append(nextReadPos);
@@ -814,4 +818,25 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       return readahead;
     }
   }
+
+  @Override
+  public synchronized void unbuffer() {
+    closeStream("unbuffer()", contentRangeFinish, false);
+  }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (toLowerCase(capability)) {
+    case StreamCapabilities.READAHEAD:
+    case StreamCapabilities.UNBUFFER:
+      return true;
+    default:
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  boolean isObjectStreamOpen() {
+    return wrappedStream != null;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java
new file mode 100644
index 0000000..d6dbce9
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
+
+public class ITestS3AContractUnbuffer extends AbstractContractUnbufferTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // patch in S3Guard options
+    maybeEnableS3Guard(conf);
+    return conf;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
new file mode 100644
index 0000000..b04b9da
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Integration test for calling
+ * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}.
+ * Validates that the object has been closed using the
+ * {@link S3AInputStream#isObjectStreamOpen()} method. Unlike the
+ * {@link org.apache.hadoop.fs.contract.s3a.ITestS3AContractUnbuffer} tests,
+ * these tests leverage the fact that isObjectStreamOpen exposes if the
+ * underlying stream has been closed or not.
+ */
+public class ITestS3AUnbuffer extends AbstractS3ATestBase {
+
+  @Test
+  public void testUnbuffer() throws IOException {
+    // Setup test file
+    Path dest = path("testUnbuffer");
+    describe("testUnbuffer");
+    try (FSDataOutputStream outputStream = getFileSystem().create(dest, true)) {
+      byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+      outputStream.write(data);
+    }
+
+    // Open file, read half the data, and then call unbuffer
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
+      assertEquals(8, inputStream.read(new byte[8]));
+      assertTrue(isObjectStreamOpen(inputStream));
+      inputStream.unbuffer();
+
+      // Check the the wrapped stream is closed
+      assertFalse(isObjectStreamOpen(inputStream));
+    }
+  }
+
+  private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
+    return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
new file mode 100644
index 0000000..c858c99
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Uses mocks to check that the {@link S3ObjectInputStream} is closed when
+ * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} is called. Unlike the
+ * other unbuffer tests, this specifically tests that the underlying S3 object
+ * stream is closed.
+ */
+public class TestS3AUnbuffer extends AbstractS3AMockTest {
+
+  @Test
+  public void testUnbuffer() throws IOException {
+    // Create mock ObjectMetadata for getFileStatus()
+    Path path = new Path("/file");
+    ObjectMetadata meta = mock(ObjectMetadata.class);
+    when(meta.getContentLength()).thenReturn(1L);
+    when(meta.getLastModified()).thenReturn(new Date(2L));
+    when(meta.getETag()).thenReturn("mock-etag");
+    when(s3.getObjectMetadata(any())).thenReturn(meta);
+
+    // Create mock S3ObjectInputStream and S3Object for open()
+    S3ObjectInputStream objectStream = mock(S3ObjectInputStream.class);
+    when(objectStream.read()).thenReturn(-1);
+
+    S3Object s3Object = mock(S3Object.class);
+    when(s3Object.getObjectContent()).thenReturn(objectStream);
+    when(s3Object.getObjectMetadata()).thenReturn(meta);
+    when(s3.getObject(any())).thenReturn(s3Object);
+
+    // Call read and then unbuffer
+    FSDataInputStream stream = fs.open(path);
+    assertEquals(0, stream.read(new byte[8])); // mocks read 0 bytes
+    stream.unbuffer();
+
+    // Verify that unbuffer closed the object stream
+    verify(objectStream, times(1)).close();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
index ec4c54a..0e12897 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
+++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
@@ -122,4 +122,9 @@
     <value>false</value>
   </property>
 
+  <property>
+    <name>fs.contract.supports-unbuffer</name>
+    <value>true</value>
+  </property>
+
 </configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org