You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2021/06/02 02:30:16 UTC

[ozone] branch master updated: HDDS-5151. Support ByteBuffer read in OzoneInputStream (#2203)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e35dde9  HDDS-5151. Support ByteBuffer read in OzoneInputStream (#2203)
e35dde9 is described below

commit e35dde9989f9fdd9caaa2e6df355e3bc9d1717f5
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed Jun 2 10:29:51 2021 +0800

    HDDS-5151. Support ByteBuffer read in OzoneInputStream (#2203)
---
 hadoop-hdds/client/pom.xml                         |  8 ++-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 29 ++++++---
 .../hadoop/hdds/scm/storage/ByteArrayReader.java   | 67 ++++++++++++++++++++
 .../hadoop/hdds/scm/storage/ByteBufferReader.java  | 71 ++++++++++++++++++++++
 .../hdds/scm/storage/ByteReaderStrategy.java       | 41 +++++++++++++
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  | 38 +++++++++++-
 .../hdds/scm/storage/TestBlockInputStream.java     | 35 +++++++++++
 .../hadoop/ozone/client/io/KeyInputStream.java     | 51 +++++++++++-----
 .../hadoop/ozone/client/io/OzoneInputStream.java   | 15 ++++-
 .../hadoop/ozone/client/io/LengthInputStream.java  | 17 +++++-
 .../ozone/client/rpc/read/TestKeyInputStream.java  | 63 +++++++++++++++++--
 .../apache/hadoop/fs/ozone/OzoneFSInputStream.java | 30 +++++----
 12 files changed, 418 insertions(+), 47 deletions(-)

diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 1646495..4e75e42 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -50,18 +50,22 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
-
     <dependency>
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-hadoop-dependency-test</artifactId>
       <scope>test</scope>
     </dependency>
-
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.github.spotbugs</groupId>
+      <artifactId>spotbugs-annotations</artifactId>
+      <version>${spotbugs.version}</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index c61e2f4..def810f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -21,12 +21,15 @@ package org.apache.hadoop.hdds.scm.storage;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -57,7 +60,7 @@ import org.slf4j.LoggerFactory;
  * through the sequence of chunks through {@link ChunkInputStream}.
  */
 public class BlockInputStream extends InputStream
-    implements Seekable, CanUnbuffer {
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(BlockInputStream.class);
@@ -267,22 +270,33 @@ public class BlockInputStream extends InputStream
    */
   @Override
   public synchronized int read(byte[] b, int off, int len) throws IOException {
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
+    ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
+    if (len == 0) {
+      return 0;
     }
+    return readWithStrategy(strategy);
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+    ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
+    int len = strategy.getTargetLength();
     if (len == 0) {
       return 0;
     }
+    return readWithStrategy(strategy);
+  }
 
+  synchronized int readWithStrategy(ByteReaderStrategy strategy) throws
+      IOException {
+    Preconditions.checkArgument(strategy != null);
     if (!initialized) {
       initialize();
     }
 
     checkOpen();
     int totalReadLen = 0;
+    int len = strategy.getTargetLength();
     while (len > 0) {
       // if we are at the last chunk and have read the entire chunk, return
       if (chunkStreams.size() == 0 ||
@@ -297,7 +311,7 @@ public class BlockInputStream extends InputStream
       int numBytesToRead = Math.min(len, (int)current.getRemaining());
       int numBytesRead;
       try {
-        numBytesRead = current.read(b, off, numBytesToRead);
+        numBytesRead = strategy.readFromBlock(current, numBytesToRead);
         retries = 0; // reset retries after successful read
       } catch (StorageContainerException e) {
         if (shouldRetryRead(e)) {
@@ -318,7 +332,6 @@ public class BlockInputStream extends InputStream
             numBytesToRead, numBytesRead));
       }
       totalReadLen += numBytesRead;
-      off += numBytesRead;
       len -= numBytesRead;
       if (current.getRemaining() <= 0 &&
           ((chunkIndex + 1) < chunkStreams.size())) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteArrayReader.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteArrayReader.java
new file mode 100644
index 0000000..94328f0
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteArrayReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * An {@link ByteReaderStrategy} implementation which supports byte[] as the
+ * input
+ * read data buffer.
+ */
+public class ByteArrayReader implements ByteReaderStrategy {
+  private final byte[] readBuf;
+  private int offset;
+  private int targetLen;
+
+  @SuppressFBWarnings(value = "EI_EXPOSE_REP2",
+      justification = "Deep copy byte[] has bad impact on performance")
+  public ByteArrayReader(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    this.readBuf = b;
+    this.offset = off;
+    this.targetLen = len;
+  }
+
+  @Override
+  public int readFromBlock(InputStream is, int numBytesToRead) throws
+      IOException {
+    Preconditions.checkArgument(is != null);
+    int numBytesRead = is.read(readBuf, offset, numBytesToRead);
+    offset += numBytesRead;
+    targetLen -= numBytesRead;
+    return numBytesRead;
+  }
+
+  @Override
+  public int getTargetLength() {
+    return this.targetLen;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
new file mode 100644
index 0000000..fedfb95
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.ByteBufferReadable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link ByteReaderStrategy} implementation which supports ByteBuffer as the
+ * input read data buffer.
+ */
+public class ByteBufferReader implements ByteReaderStrategy {
+  private final ByteBuffer readBuf;
+  private int targetLen;
+
+  public ByteBufferReader(ByteBuffer buf) {
+    if (buf == null) {
+      throw new NullPointerException();
+    }
+    this.readBuf = buf;
+    this.targetLen = buf.remaining();
+  }
+
+  @Override
+  public int readFromBlock(InputStream is, int numBytesToRead) throws
+      IOException {
+    Preconditions.checkArgument(is != null);
+    Preconditions.checkArgument(is instanceof ByteBufferReadable);
+    // change buffer limit
+    int bufferLimit = readBuf.limit();
+    if (numBytesToRead < targetLen) {
+      readBuf.limit(readBuf.position() + numBytesToRead);
+    }
+    int numBytesRead;
+    try {
+      numBytesRead = ((ByteBufferReadable)is).read(readBuf);
+    } finally {
+      // restore buffer limit
+      if (numBytesToRead < targetLen) {
+        readBuf.limit(bufferLimit);
+      }
+    }
+    targetLen -= numBytesRead;
+    return numBytesRead;
+  }
+
+  @Override
+  public int getTargetLength() {
+    return this.targetLen;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteReaderStrategy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteReaderStrategy.java
new file mode 100644
index 0000000..bb78a94
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteReaderStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A Reader interface to work with InputStream.
+ */
+public interface ByteReaderStrategy {
+  /**
+   * Read from a block using the InputStream.
+   * @param is
+   * @param numBytesToRead how many bytes to read
+   * @return number of bytes read
+   * @throws IOException
+   */
+  int readFromBlock(InputStream is, int numBytesToRead) throws IOException;
+
+  /**
+   * @return the target length to read.
+   */
+  int getTargetLength();
+}
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 0653d1b..f7cfa47 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -57,7 +58,7 @@ import org.slf4j.LoggerFactory;
  * instances.
  */
 public class ChunkInputStream extends InputStream
-    implements Seekable, CanUnbuffer {
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ChunkInputStream.class);
@@ -191,6 +192,41 @@ public class ChunkInputStream extends InputStream
     return total;
   }
 
+  @Override
+  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+    if (byteBuffer == null) {
+      throw new NullPointerException();
+    }
+    int len = byteBuffer.remaining();
+    if (len == 0) {
+      return 0;
+    }
+    acquireClient();
+    int total = 0;
+    while (len > 0) {
+      int available = prepareRead(len);
+      if (available == EOF) {
+        // There is no more data in the chunk stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+        return total != 0 ? total : EOF;
+      }
+      ByteBuffer readBuf = buffers[bufferIndex];
+      ByteBuffer tmpBuf = readBuf.duplicate();
+      tmpBuf.limit(tmpBuf.position() + available);
+      byteBuffer.put(tmpBuf);
+      readBuf.position(tmpBuf.position());
+
+      len -= available;
+      total += available;
+
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
+    }
+    return total;
+  }
+
   /**
    * Seeks the ChunkInputStream to the specified position. This is done by
    * updating the chunkPosition to the seeked position in case the buffers
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index e202875..a382968 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -42,6 +42,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -208,6 +209,40 @@ public class TestBlockInputStream {
   }
 
   @Test
+  public void testReadWithByteBuffer() throws Exception {
+    // read 200 bytes of data starting from position 50. Chunk0 contains
+    // indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So
+    // the read should result in 3 ChunkInputStream reads
+    seekAndVerify(50);
+    ByteBuffer buffer = ByteBuffer.allocate(200);
+    blockStream.read(buffer);
+    matchWithInputData(buffer.array(), 50, 200);
+
+    // The new position of the blockInputStream should be the last index read
+    // + 1.
+    Assert.assertEquals(250, blockStream.getPos());
+    Assert.assertEquals(2, blockStream.getChunkIndex());
+  }
+
+  @Test
+  public void testReadWithDirectByteBuffer() throws Exception {
+    // read 200 bytes of data starting from position 50. Chunk0 contains
+    // indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So
+    // the read should result in 3 ChunkInputStream reads
+    seekAndVerify(50);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(200);
+    blockStream.read(buffer);
+    for (int i = 50; i < 50 + 200; i++) {
+      Assert.assertEquals(blockData[i], buffer.get(i - 50));
+    }
+
+    // The new position of the blockInputStream should be the last index read
+    // + 1.
+    Assert.assertEquals(250, blockStream.getPos());
+    Assert.assertEquals(2, blockStream.getChunkIndex());
+  }
+
+  @Test
   public void testSeekAndRead() throws Exception {
     // Seek to a position and read data
     seekAndVerify(50);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 10d63d0..e84e39a 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -28,6 +29,8 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
@@ -35,6 +38,9 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 
@@ -47,7 +53,7 @@ import org.slf4j.LoggerFactory;
  * Maintaining a list of BlockInputStream. Read based on offset.
  */
 public class KeyInputStream extends InputStream
-    implements Seekable, CanUnbuffer {
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(KeyInputStream.class);
@@ -216,18 +222,32 @@ public class KeyInputStream extends InputStream
    */
   @Override
   public synchronized int read(byte[] b, int off, int len) throws IOException {
-    checkOpen();
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
+    ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
+    int bufferLen = strategy.getTargetLength();
+    if (bufferLen == 0) {
+      return 0;
     }
-    if (len == 0) {
+    return readWithStrategy(strategy);
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+    ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
+    int bufferLen = strategy.getTargetLength();
+    if (bufferLen == 0) {
       return 0;
     }
+    return readWithStrategy(strategy);
+  }
+
+  synchronized int readWithStrategy(ByteReaderStrategy strategy) throws
+      IOException {
+    Preconditions.checkArgument(strategy != null);
+    checkOpen();
+
+    int buffLen = strategy.getTargetLength();
     int totalReadLen = 0;
-    while (len > 0) {
+    while (buffLen > 0) {
       // if we are at the last block and have read the entire block, return
       if (blockStreams.size() == 0 ||
           (blockStreams.size() - 1 <= blockIndex &&
@@ -238,20 +258,19 @@ public class KeyInputStream extends InputStream
 
       // Get the current blockStream and read data from it
       BlockInputStream current = blockStreams.get(blockIndex);
-      int numBytesToRead = Math.min(len, (int)current.getRemaining());
-      int numBytesRead = current.read(b, off, numBytesToRead);
+      int numBytesToRead = Math.min(buffLen, (int)current.getRemaining());
+      int numBytesRead = strategy.readFromBlock(current, numBytesToRead);
       if (numBytesRead != numBytesToRead) {
         // This implies that there is either data loss or corruption in the
         // chunk entries. Even EOF in the current stream would be covered in
         // this case.
         throw new IOException(String.format("Inconsistent read for blockID=%s "
-                        + "length=%d numBytesToRead=%d numBytesRead=%d",
-                current.getBlockID(), current.getLength(), numBytesToRead,
-                numBytesRead));
+                + "length=%d numBytesToRead=%d numBytesRead=%d",
+            current.getBlockID(), current.getLength(), numBytesToRead,
+            numBytesRead));
       }
       totalReadLen += numBytesRead;
-      off += numBytesRead;
-      len -= numBytesRead;
+      buffLen -= numBytesRead;
       if (current.getRemaining() <= 0 &&
           ((blockIndex + 1) < blockStreams.size())) {
         blockIndex += 1;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
index fb3928d..548a20e 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
@@ -17,16 +17,19 @@
 
 package org.apache.hadoop.ozone.client.io;
 
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanUnbuffer;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 /**
  * OzoneInputStream is used to read data from Ozone.
  * It uses {@link KeyInputStream} for reading the data.
  */
-public class OzoneInputStream extends InputStream implements CanUnbuffer {
+public class OzoneInputStream extends InputStream implements CanUnbuffer,
+    ByteBufferReadable {
 
   private final InputStream inputStream;
 
@@ -53,6 +56,16 @@ public class OzoneInputStream extends InputStream implements CanUnbuffer {
   }
 
   @Override
+  public int read(ByteBuffer byteBuffer) throws IOException {
+    if (inputStream instanceof ByteBufferReadable) {
+      return ((ByteBufferReadable)inputStream).read(byteBuffer);
+    } else {
+      throw new UnsupportedOperationException("Read with ByteBuffer is not " +
+          " supported by " + inputStream.getClass().getName());
+    }
+  }
+
+  @Override
   public synchronized void close() throws IOException {
     inputStream.close();
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java
index baf1887..d2adcc6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java
@@ -18,13 +18,18 @@
 
 package org.apache.hadoop.ozone.client.io;
 
+import org.apache.hadoop.fs.ByteBufferReadable;
+
 import java.io.FilterInputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 /**
  * An input stream with length.
  */
-public class LengthInputStream extends FilterInputStream {
+public class LengthInputStream extends FilterInputStream implements
+    ByteBufferReadable {
 
   private final long length;
 
@@ -46,4 +51,14 @@ public class LengthInputStream extends FilterInputStream {
   public InputStream getWrappedStream() {
     return in;
   }
+
+  @Override
+  public int read(ByteBuffer byteBuffer) throws IOException {
+    if (in instanceof ByteBufferReadable) {
+      return ((ByteBufferReadable)in).read(byteBuffer);
+    } else {
+      throw new UnsupportedOperationException("Read with ByteBuffer is not " +
+          " supported by " + in.getClass().getName());
+    }
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
index a451375..eb7a6b0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc.read;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import java.util.List;
@@ -184,14 +185,14 @@ public class TestKeyInputStream extends TestInputStreamBase {
   }
 
   @Test
-  public void testReadChunk() throws Exception {
+  public void testReadChunkWithByteArray() throws Exception {
     String keyName = getNewKeyName();
 
     // write data spanning multiple blocks/chunks
     int dataLength = 2 * BLOCK_SIZE + (BLOCK_SIZE / 2);
     byte[] data = writeRandomBytes(keyName, dataLength);
 
-    // read chunk data
+    // read chunk data using Byte Array
     try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
 
       int[] bufferSizeList = {BYTES_PER_CHECKSUM + 1, CHUNK_SIZE / 4,
@@ -205,6 +206,27 @@ public class TestKeyInputStream extends TestInputStreamBase {
   }
 
   @Test
+  public void testReadChunkWithByteBuffer() throws Exception {
+    String keyName = getNewKeyName();
+
+    // write data spanning multiple blocks/chunks
+    int dataLength = 2 * BLOCK_SIZE + (BLOCK_SIZE / 2);
+    byte[] data = writeRandomBytes(keyName, dataLength);
+
+    // read chunk data using ByteBuffer
+    try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+
+      int[] bufferSizeList = {BYTES_PER_CHECKSUM + 1, CHUNK_SIZE / 4,
+          CHUNK_SIZE / 2, CHUNK_SIZE - 1, CHUNK_SIZE, CHUNK_SIZE + 1,
+          BLOCK_SIZE - 1, BLOCK_SIZE, BLOCK_SIZE + 1, BLOCK_SIZE * 2};
+      for (int bufferSize : bufferSizeList) {
+        assertReadFullyUsingByteBuffer(data, keyInputStream, bufferSize, 0);
+        keyInputStream.seek(0);
+      }
+    }
+  }
+
+  @Test
   public void testSkip() throws Exception {
     XceiverClientManager.resetXceiverClientMetrics();
     XceiverClientMetrics metrics = XceiverClientManager
@@ -297,19 +319,27 @@ public class TestKeyInputStream extends TestInputStreamBase {
 
     // read chunk data
     try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
-
       int b = keyInputStream.read();
       Assert.assertNotEquals(-1, b);
-
       if (doUnbuffer) {
         keyInputStream.unbuffer();
       }
-
       getCluster().shutdownHddsDatanode(pipelineNodes.get(0));
-
       // check that we can still read it
       assertReadFully(data, keyInputStream, dataLength - 1, 1);
     }
+
+    // read chunk data with ByteBuffer
+    try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+      int b = keyInputStream.read();
+      Assert.assertNotEquals(-1, b);
+      if (doUnbuffer) {
+        keyInputStream.unbuffer();
+      }
+      getCluster().shutdownHddsDatanode(pipelineNodes.get(0));
+      // check that we can still read it
+      assertReadFullyUsingByteBuffer(data, keyInputStream, dataLength - 1, 1);
+    }
   }
 
   private void waitForNodeToBecomeDead(
@@ -351,4 +381,25 @@ public class TestKeyInputStream extends TestInputStreamBase {
     }
     Assert.assertEquals(data.length, totalRead);
   }
+
+  private void assertReadFullyUsingByteBuffer(byte[] data, KeyInputStream in,
+      int bufferSize, int totalRead) throws IOException {
+
+    ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
+    while (totalRead < data.length) {
+      int numBytesRead = in.read(buffer);
+      if (numBytesRead == -1 || numBytesRead == 0) {
+        break;
+      }
+      byte[] tmp1 =
+          Arrays.copyOfRange(data, totalRead, totalRead + numBytesRead);
+      byte[] tmp2 = new byte[numBytesRead];
+      buffer.flip();
+      buffer.get(tmp2, 0, numBytesRead);
+      Assert.assertArrayEquals(tmp1, tmp2);
+      totalRead += numBytesRead;
+      buffer.clear();
+    }
+    Assert.assertEquals(data.length, totalRead);
+  }
 }
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
index 35bd0d5..d7888a5 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
@@ -105,23 +105,29 @@ public class OzoneFSInputStream extends FSInputStream
       throw new ReadOnlyBufferException();
     }
 
-    int readLen = Math.min(buf.remaining(), available());
-
     int bytesRead;
-    if (buf.hasArray()) {
-      int pos = buf.position();
-      bytesRead = read(buf.array(), pos, readLen);
-      if (bytesRead > 0) {
-        buf.position(pos + bytesRead);
-      }
+    if (inputStream instanceof ByteBufferReadable) {
+      bytesRead = ((ByteBufferReadable)inputStream).read(buf);
     } else {
-      byte[] readData = new byte[readLen];
-      bytesRead = read(readData, 0, readLen);
-      if (bytesRead > 0) {
-        buf.put(readData);
+      int readLen = Math.min(buf.remaining(), available());
+      if (buf.hasArray()) {
+        int pos = buf.position();
+        bytesRead = read(buf.array(), pos, readLen);
+        if (bytesRead > 0) {
+          buf.position(pos + bytesRead);
+        }
+      } else {
+        byte[] readData = new byte[readLen];
+        bytesRead = read(readData, 0, readLen);
+        if (bytesRead > 0) {
+          buf.put(readData);
+        }
       }
     }
 
+    if (statistics != null && bytesRead >= 0) {
+      statistics.incrementBytesRead(bytesRead);
+    }
     return bytesRead;
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org