You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/04/13 01:13:45 UTC
[hadoop] branch trunk updated: HADOOP-14747. S3AInputStream to
implement CanUnbuffer.
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2382f63 HADOOP-14747. S3AInputStream to implement CanUnbuffer.
2382f63 is described below
commit 2382f63fc0bb4108f3f3c542b4be7c04fbedd7c4
Author: Sahil Takiar <st...@cloudera.com>
AuthorDate: Fri Apr 12 18:09:14 2019 -0700
HADOOP-14747. S3AInputStream to implement CanUnbuffer.
Author: Sahil Takiar <st...@cloudera.com>
---
.../site/markdown/filesystem/fsdatainputstream.md | 37 ++++++
.../fs/contract/AbstractContractUnbufferTest.java | 125 +++++++++++++++++++++
.../apache/hadoop/fs/contract/ContractOptions.java | 5 +
.../fs/contract/hdfs/TestHDFSContractUnbuffer.java | 46 ++++++++
.../src/test/resources/contract/hdfs.xml | 5 +
.../org/apache/hadoop/fs/s3a/S3AInputStream.java | 39 +++++--
.../fs/contract/s3a/ITestS3AContractUnbuffer.java | 41 +++++++
.../org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java | 66 +++++++++++
.../org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java | 76 +++++++++++++
.../hadoop-aws/src/test/resources/contract/s3a.xml | 5 +
10 files changed, 438 insertions(+), 7 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index e067b07..0906964 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -275,6 +275,43 @@ class, which can react to a checksum error in a read by attempting to source
the data elsewhere. If a new source can be found it attempts to reread and
recheck that portion of the file.
+### `CanUnbuffer.unbuffer()`
+
+This operation instructs the source to release any system resources they are
+currently holding on to, such as buffers, sockets, file descriptors, etc. Any
+subsequent IO operation will likely have to reacquire these resources.
+Unbuffering is useful in situation where streams need to remain open, but no IO
+operation is expected from the stream in the immediate future (examples include
+file handle cacheing).
+
+#### Preconditions
+
+Not all subclasses implement this operation. In addition to implementing
+`CanUnbuffer`. Subclasses must implement the `StreamCapabilities` interface and
+`StreamCapabilities.hasCapability(UNBUFFER)` must return true. If a subclass
+implements `CanUnbuffer` but does not report the functionality via
+`StreamCapabilities` then the call to `unbuffer` does nothing. If a subclass
+reports that it does implement `UNBUFFER`, but does not implement the
+`CanUnbuffer` interface, an `UnsupportedOperationException` is thrown.
+
+ supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
+
+This method is not thread-safe. If `unbuffer` is called while a `read` is in
+progress, the outcome is undefined.
+
+`unbuffer` can be called on a closed file, in which case `unbuffer` will do
+nothing.
+
+#### Postconditions
+
+The majority of subclasses that do not implement this operation simply
+do nothing.
+
+If the operation is supported, `unbuffer` releases any and all system resources
+associated with the stream. The exact list of what these resources are is
+generally implementation dependent, however, in general, it may include
+buffers, sockets, file descriptors, etc.
+
## <a name="PositionedReadable"></a> interface `PositionedReadable`
The `PositionedReadable` operations supply "positioned reads" ("pread").
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
new file mode 100644
index 0000000..7ba32ba
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
+ */
+public abstract class AbstractContractUnbufferTest extends AbstractFSContractTestBase {
+
+ private Path file;
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ skipIfUnsupported(SUPPORTS_UNBUFFER);
+ file = path("unbufferFile");
+ createFile(getFileSystem(), file, true,
+ dataset(TEST_FILE_LEN, 0, 255));
+ }
+
+ @Test
+ public void testUnbufferAfterRead() throws IOException {
+ describe("unbuffer a file after a single read");
+ try (FSDataInputStream stream = getFileSystem().open(file)) {
+ assertEquals(128, stream.read(new byte[128]));
+ unbuffer(stream);
+ }
+ }
+
+ @Test
+ public void testUnbufferBeforeRead() throws IOException {
+ describe("unbuffer a file before a read");
+ try (FSDataInputStream stream = getFileSystem().open(file)) {
+ unbuffer(stream);
+ assertEquals(128, stream.read(new byte[128]));
+ }
+ }
+
+ @Test
+ public void testUnbufferEmptyFile() throws IOException {
+ Path emptyFile = path("emptyUnbufferFile");
+ createFile(getFileSystem(), emptyFile, true,
+ dataset(TEST_FILE_LEN, 0, 255));
+ describe("unbuffer an empty file");
+ try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
+ unbuffer(stream);
+ }
+ }
+
+ @Test
+ public void testUnbufferOnClosedFile() throws IOException {
+ describe("unbuffer a file before a read");
+ FSDataInputStream stream = null;
+ try {
+ stream = getFileSystem().open(file);
+ assertEquals(128, stream.read(new byte[128]));
+ } finally {
+ if (stream != null) {
+ stream.close();
+ }
+ }
+ unbuffer(stream);
+ }
+
+ @Test
+ public void testMultipleUnbuffers() throws IOException {
+ describe("unbuffer a file multiple times");
+ try (FSDataInputStream stream = getFileSystem().open(file)) {
+ unbuffer(stream);
+ unbuffer(stream);
+ assertEquals(128, stream.read(new byte[128]));
+ unbuffer(stream);
+ unbuffer(stream);
+ }
+ }
+
+ @Test
+ public void testUnbufferMultipleReads() throws IOException {
+ describe("unbuffer a file multiple times");
+ try (FSDataInputStream stream = getFileSystem().open(file)) {
+ unbuffer(stream);
+ assertEquals(128, stream.read(new byte[128]));
+ unbuffer(stream);
+ assertEquals(128, stream.read(new byte[128]));
+ assertEquals(128, stream.read(new byte[128]));
+ unbuffer(stream);
+ assertEquals(128, stream.read(new byte[128]));
+ assertEquals(128, stream.read(new byte[128]));
+ assertEquals(128, stream.read(new byte[128]));
+ unbuffer(stream);
+ }
+ }
+
+ private void unbuffer(FSDataInputStream stream) throws IOException {
+ long pos = stream.getPos();
+ stream.unbuffer();
+ assertEquals(pos, stream.getPos());
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
index cca3d4c..91a1121 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
@@ -202,6 +202,11 @@ public interface ContractOptions {
String SUPPORTS_CONTENT_CHECK = "supports-content-check";
/**
+ * Indicates that FS supports unbuffer.
+ */
+ String SUPPORTS_UNBUFFER = "supports-unbuffer";
+
+ /**
* Maximum path length
* {@value}
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java
new file mode 100644
index 0000000..54b8bf1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class TestHDFSContractUnbuffer extends AbstractContractUnbufferTest {
+
+ @BeforeClass
+ public static void createCluster() throws IOException {
+ HDFSContract.createCluster();
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws IOException {
+ HDFSContract.destroyCluster();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HDFSContract(conf);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
index 261d4ba..3c9fccc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
@@ -111,4 +111,9 @@
<value>true</value>
</property>
+ <property>
+ <name>fs.contract.supports-unbuffer</name>
+ <value>true</value>
+ </property>
+
</configuration>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index d096601..cbe796b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -25,14 +25,17 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.SSECustomerKey;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.slf4j.Logger;
@@ -43,6 +46,7 @@ import java.io.IOException;
import java.net.SocketTimeoutException;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.util.StringUtils.toLowerCase;
/**
* The input stream for an S3A object.
@@ -63,7 +67,8 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class S3AInputStream extends FSInputStream implements CanSetReadahead {
+public class S3AInputStream extends FSInputStream implements CanSetReadahead,
+ CanUnbuffer, StreamCapabilities {
public static final String E_NEGATIVE_READAHEAD_VALUE
= "Negative readahead value";
@@ -175,7 +180,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private synchronized void reopen(String reason, long targetPos, long length,
boolean forceAbort) throws IOException {
- if (wrappedStream != null) {
+ if (isObjectStreamOpen()) {
closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
}
@@ -542,7 +547,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/
@Retries.OnceRaw
private void closeStream(String reason, long length, boolean forceAbort) {
- if (wrappedStream != null) {
+ if (isObjectStreamOpen()) {
// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
@@ -605,12 +610,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
@InterfaceStability.Unstable
public synchronized boolean resetConnection() throws IOException {
checkNotClosed();
- boolean connectionOpen = wrappedStream != null;
- if (connectionOpen) {
+ if (isObjectStreamOpen()) {
LOG.info("Forced reset of connection to {}", uri);
closeStream("reset()", contentRangeFinish, true);
}
- return connectionOpen;
+ return isObjectStreamOpen();
}
@Override
@@ -677,7 +681,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
"S3AInputStream{");
sb.append(uri);
sb.append(" wrappedStream=")
- .append(wrappedStream != null ? "open" : "closed");
+ .append(isObjectStreamOpen() ? "open" : "closed");
sb.append(" read policy=").append(inputPolicy);
sb.append(" pos=").append(pos);
sb.append(" nextReadPos=").append(nextReadPos);
@@ -814,4 +818,25 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
return readahead;
}
}
+
+ @Override
+ public synchronized void unbuffer() {
+ closeStream("unbuffer()", contentRangeFinish, false);
+ }
+
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (toLowerCase(capability)) {
+ case StreamCapabilities.READAHEAD:
+ case StreamCapabilities.UNBUFFER:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @VisibleForTesting
+ boolean isObjectStreamOpen() {
+ return wrappedStream != null;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java
new file mode 100644
index 0000000..d6dbce9
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
+
+public class ITestS3AContractUnbuffer extends AbstractContractUnbufferTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ // patch in S3Guard options
+ maybeEnableS3Guard(conf);
+ return conf;
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new S3AContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
new file mode 100644
index 0000000..b04b9da
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Integration test for calling
+ * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}.
+ * Validates that the object has been closed using the
+ * {@link S3AInputStream#isObjectStreamOpen()} method. Unlike the
+ * {@link org.apache.hadoop.fs.contract.s3a.ITestS3AContractUnbuffer} tests,
+ * these tests leverage the fact that isObjectStreamOpen exposes if the
+ * underlying stream has been closed or not.
+ */
+public class ITestS3AUnbuffer extends AbstractS3ATestBase {
+
+ @Test
+ public void testUnbuffer() throws IOException {
+ // Setup test file
+ Path dest = path("testUnbuffer");
+ describe("testUnbuffer");
+ try (FSDataOutputStream outputStream = getFileSystem().create(dest, true)) {
+ byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+ outputStream.write(data);
+ }
+
+ // Open file, read half the data, and then call unbuffer
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
+ assertEquals(8, inputStream.read(new byte[8]));
+ assertTrue(isObjectStreamOpen(inputStream));
+ inputStream.unbuffer();
+
+ // Check the the wrapped stream is closed
+ assertFalse(isObjectStreamOpen(inputStream));
+ }
+ }
+
+ private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
+ return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
new file mode 100644
index 0000000..c858c99
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Uses mocks to check that the {@link S3ObjectInputStream} is closed when
+ * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} is called. Unlike the
+ * other unbuffer tests, this specifically tests that the underlying S3 object
+ * stream is closed.
+ */
+public class TestS3AUnbuffer extends AbstractS3AMockTest {
+
+ @Test
+ public void testUnbuffer() throws IOException {
+ // Create mock ObjectMetadata for getFileStatus()
+ Path path = new Path("/file");
+ ObjectMetadata meta = mock(ObjectMetadata.class);
+ when(meta.getContentLength()).thenReturn(1L);
+ when(meta.getLastModified()).thenReturn(new Date(2L));
+ when(meta.getETag()).thenReturn("mock-etag");
+ when(s3.getObjectMetadata(any())).thenReturn(meta);
+
+ // Create mock S3ObjectInputStream and S3Object for open()
+ S3ObjectInputStream objectStream = mock(S3ObjectInputStream.class);
+ when(objectStream.read()).thenReturn(-1);
+
+ S3Object s3Object = mock(S3Object.class);
+ when(s3Object.getObjectContent()).thenReturn(objectStream);
+ when(s3Object.getObjectMetadata()).thenReturn(meta);
+ when(s3.getObject(any())).thenReturn(s3Object);
+
+ // Call read and then unbuffer
+ FSDataInputStream stream = fs.open(path);
+ assertEquals(0, stream.read(new byte[8])); // mocks read 0 bytes
+ stream.unbuffer();
+
+ // Verify that unbuffer closed the object stream
+ verify(objectStream, times(1)).close();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
index ec4c54a..0e12897 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
+++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
@@ -122,4 +122,9 @@
<value>false</value>
</property>
+ <property>
+ <name>fs.contract.supports-unbuffer</name>
+ <value>true</value>
+ </property>
+
</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org