You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/24 16:27:25 UTC

[ozone] 07/36: HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 05b1910e2a57d5ae5dad62a67dbc5c6a464b52ec
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Wed Sep 8 09:25:17 2021 +0800

    HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)
---
 hadoop-hdds/client/pom.xml                         |  4 --
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 26 +++++----
 ...reamOutput.java => ByteBufferStreamOutput.java} | 15 +++--
 .../client/io/BlockDataStreamOutputEntry.java      | 51 ++++++++---------
 .../ozone/client/io/KeyDataStreamOutput.java       | 15 +++--
 .../ozone/client/io/OzoneDataStreamOutput.java     | 30 +++++-----
 .../client/rpc/TestBlockDataStreamOutput.java      | 66 +++++++++-------------
 .../hadoop/ozone/shell/keys/PutKeyHandler.java     |  8 +--
 8 files changed, 98 insertions(+), 117 deletions(-)

diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 17120fa250..9f2116c96f 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -70,10 +70,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <version>${spotbugs.version}</version>
       <scope>provided</scope>
     </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-buffer</artifactId>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 39ec2f9219..d0419fa0c3 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.storage;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -46,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
 
 /**
- * An {@link ByteBufStreamOutput} used by the REST service in combination
+ * An {@link ByteBufferStreamOutput} used by the REST service in combination
  * with the SCMClient to write the value of a key to a sequence
  * of container chunks.  Writes are buffered locally and periodically written to
  * the container as a new chunk.  In order to preserve the semantics that
@@ -74,7 +74,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
  * This class encapsulates all state management for buffering and writing
  * through to the container.
  */
-public class BlockDataStreamOutput implements ByteBufStreamOutput {
+public class BlockDataStreamOutput implements ByteBufferStreamOutput {
   public static final Logger LOG =
       LoggerFactory.getLogger(BlockDataStreamOutput.class);
   public static final String EXCEPTION_MSG =
@@ -209,16 +209,16 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
   }
 
   @Override
-  public void write(ByteBuf buf) throws IOException {
+  public void write(ByteBuffer b, int off, int len) throws IOException {
     checkOpen();
-    if (buf == null) {
+    if (b == null) {
       throw new NullPointerException();
     }
-    final int len = buf.readableBytes();
     if (len == 0) {
       return;
     }
-    writeChunkToContainer(buf);
+    writeChunkToContainer(
+            (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len));
 
     writtenDataLength += len;
   }
@@ -476,15 +476,17 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * Writes buffered data as a new chunk to the container and saves chunk
    * information to be used later in putKey call.
    *
+   * @param buf chunk data to write, from position to limit
    * @throws IOException if there is an I/O error while performing the call
    * @throws OzoneChecksumException if there is an error while computing
    * checksum
    */
-  private void writeChunkToContainer(ByteBuf buf)
+  private void writeChunkToContainer(ByteBuffer buf)
       throws IOException {
-    ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer());
-    int effectiveChunkSize = buf.readableBytes();
+    final int effectiveChunkSize = buf.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
+    ChecksumData checksumData =
+        checksum.computeChecksum(buf.asReadOnlyBuffer());
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
         .setOffset(offset)
@@ -499,8 +501,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
     CompletableFuture<DataStreamReply> future =
         (needSync(offset + effectiveChunkSize) ?
-            out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) :
-            out.writeAsync(buf.nioBuffer()))
+            out.writeAsync(buf, StandardWriteOption.SYNC) :
+            out.writeAsync(buf))
             .whenCompleteAsync((r, e) -> {
               if (e != null || !r.isSuccess()) {
                 if (e == null) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
similarity index 82%
rename from hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
index 7f40737b70..0650a685b6 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
@@ -18,23 +18,24 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import io.netty.buffer.ByteBuf;
-
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
 * This interface is for writing an output stream of ByteBuffers.
-* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink.
+* An ByteBufferStreamOutput accepts nio ByteBuffer and sends them to some sink.
 */
-public interface ByteBufStreamOutput extends Closeable {
+public interface ByteBufferStreamOutput extends Closeable {
   /**
    * Try to write all the bytes in ByteBuf b to DataStream.
    *
    * @param b the data.
    * @exception IOException if an I/O error occurs.
    */
-  void write(ByteBuf b) throws IOException;
+  default void write(ByteBuffer b) throws IOException {
+    write(b, b.position(), b.remaining());
+  }
 
   /**
    * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
@@ -44,9 +45,7 @@ public interface ByteBufStreamOutput extends Closeable {
    * @param len the number of bytes to write.
    * @exception  IOException  if an I/O error occurs.
    */
-  default void write(ByteBuf b, int off, int len) throws IOException {
-    write(b.slice(off, len));
-  }
+  void write(ByteBuffer b, int off, int len) throws IOException;
 
   /**
    * Flushes this DataStream output and forces any buffered output bytes
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index 98907bf8af..f0c3a43e89 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -18,18 +18,18 @@
 package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -37,10 +37,10 @@ import java.util.Collections;
  * Helper class used inside {@link BlockDataStreamOutput}.
  * */
 public final class BlockDataStreamOutputEntry
-    implements ByteBufStreamOutput {
+    implements ByteBufferStreamOutput {
 
   private final OzoneClientConfig config;
-  private ByteBufStreamOutput byteBufStreamOutput;
+  private ByteBufferStreamOutput byteBufferStreamOutput;
   private BlockID blockID;
   private final String key;
   private final XceiverClientFactory xceiverClientManager;
@@ -61,7 +61,7 @@ public final class BlockDataStreamOutputEntry
       OzoneClientConfig config
   ) {
     this.config = config;
-    this.byteBufStreamOutput = null;
+    this.byteBufferStreamOutput = null;
     this.blockID = blockID;
     this.key = key;
     this.xceiverClientManager = xceiverClientManager;
@@ -90,63 +90,62 @@ public final class BlockDataStreamOutputEntry
    * @throws IOException if xceiverClient initialization fails
    */
   private void checkStream() throws IOException {
-    if (this.byteBufStreamOutput == null) {
-      this.byteBufStreamOutput =
+    if (this.byteBufferStreamOutput == null) {
+      this.byteBufferStreamOutput =
           new BlockDataStreamOutput(blockID, xceiverClientManager,
               pipeline, config, token);
     }
   }
 
   @Override
-  public void write(ByteBuf b) throws IOException {
+  public void write(ByteBuffer b, int off, int len) throws IOException {
     checkStream();
-    final int len = b.readableBytes();
-    byteBufStreamOutput.write(b);
+    byteBufferStreamOutput.write(b, off, len);
     this.currentPosition += len;
   }
 
   @Override
   public void flush() throws IOException {
-    if (this.byteBufStreamOutput != null) {
-      this.byteBufStreamOutput.flush();
+    if (this.byteBufferStreamOutput != null) {
+      this.byteBufferStreamOutput.flush();
     }
   }
 
   @Override
   public void close() throws IOException {
-    if (this.byteBufStreamOutput != null) {
-      this.byteBufStreamOutput.close();
+    if (this.byteBufferStreamOutput != null) {
+      this.byteBufferStreamOutput.close();
       // after closing the chunkOutPutStream, blockId would have been
       // reconstructed with updated bcsId
       this.blockID =
-          ((BlockDataStreamOutput) byteBufStreamOutput).getBlockID();
+          ((BlockDataStreamOutput) byteBufferStreamOutput).getBlockID();
     }
   }
 
   boolean isClosed() {
-    if (byteBufStreamOutput != null) {
-      return  ((BlockDataStreamOutput) byteBufStreamOutput).isClosed();
+    if (byteBufferStreamOutput != null) {
+      return  ((BlockDataStreamOutput) byteBufferStreamOutput).isClosed();
     }
     return false;
   }
 
   Collection<DatanodeDetails> getFailedServers() {
-    if (byteBufStreamOutput != null) {
+    if (byteBufferStreamOutput != null) {
       BlockDataStreamOutput out =
-          (BlockDataStreamOutput) this.byteBufStreamOutput;
+          (BlockDataStreamOutput) this.byteBufferStreamOutput;
       return out.getFailedServers();
     }
     return Collections.emptyList();
   }
 
   long getWrittenDataLength() {
-    if (byteBufStreamOutput != null) {
+    if (byteBufferStreamOutput != null) {
       BlockDataStreamOutput out =
-          (BlockDataStreamOutput) this.byteBufStreamOutput;
+          (BlockDataStreamOutput) this.byteBufferStreamOutput;
       return out.getWrittenDataLength();
     } else {
       // For a pre allocated block for which no write has been initiated,
-      // the ByteBufStreamOutput will be null here.
+      // the ByteBufferStreamOutput will be null here.
       // In such cases, the default blockCommitSequenceId will be 0
       return 0;
     }
@@ -155,7 +154,7 @@ public final class BlockDataStreamOutputEntry
   void cleanup(boolean invalidateClient) throws IOException {
     checkStream();
     BlockDataStreamOutput out =
-        (BlockDataStreamOutput) this.byteBufStreamOutput;
+        (BlockDataStreamOutput) this.byteBufferStreamOutput;
     out.cleanup(invalidateClient);
 
   }
@@ -163,7 +162,7 @@ public final class BlockDataStreamOutputEntry
   void writeOnRetry(long len) throws IOException {
     checkStream();
     BlockDataStreamOutput out =
-        (BlockDataStreamOutput) this.byteBufStreamOutput;
+        (BlockDataStreamOutput) this.byteBufferStreamOutput;
     out.writeOnRetry(len);
     this.currentPosition += len;
 
@@ -231,8 +230,8 @@ public final class BlockDataStreamOutputEntry
   }
 
   @VisibleForTesting
-  public ByteBufStreamOutput getByteBufStreamOutput() {
-    return byteBufStreamOutput;
+  public ByteBufferStreamOutput getByteBufStreamOutput() {
+    return byteBufferStreamOutput;
   }
 
   public BlockID getBlockID() {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index c37f9cd51d..9bba89d0a8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -32,7 +31,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -48,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -63,7 +63,7 @@ import java.util.stream.Collectors;
  *
  * TODO : currently not support multi-thread access.
  */
-public class KeyDataStreamOutput implements ByteBufStreamOutput {
+public class KeyDataStreamOutput implements ByteBufferStreamOutput {
 
   private OzoneClientConfig config;
 
@@ -185,17 +185,16 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
   }
 
   @Override
-  public void write(ByteBuf b) throws IOException {
+  public void write(ByteBuffer b, int off, int len) throws IOException {
     checkNotClosed();
     if (b == null) {
       throw new NullPointerException();
     }
-    final int len = b.readableBytes();
-    handleWrite(b, b.readerIndex(), len, false);
+    handleWrite(b, off, len, false);
     writeOffset += len;
   }
 
-  private void handleWrite(ByteBuf b, int off, long len, boolean retry)
+  private void handleWrite(ByteBuffer b, int off, long len, boolean retry)
       throws IOException {
     while (len > 0) {
       try {
@@ -227,7 +226,7 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
   }
 
   private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
-      boolean retry, long len, ByteBuf b, int writeLen, int off,
+      boolean retry, long len, ByteBuffer b, int writeLen, int off,
       long currentPos) throws IOException {
     try {
       if (retry) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index 378b86872e..d40ac2b332 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -16,55 +16,55 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
-import io.netty.buffer.ByteBuf;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * OzoneDataStreamOutput is used to write data into Ozone.
  * It uses SCM's {@link KeyDataStreamOutput} for writing the data.
  */
-public class OzoneDataStreamOutput implements ByteBufStreamOutput {
+public class OzoneDataStreamOutput implements ByteBufferStreamOutput {
 
-  private final ByteBufStreamOutput byteBufStreamOutput;
+  private final ByteBufferStreamOutput byteBufferStreamOutput;
 
   /**
    * Constructs OzoneDataStreamOutput with KeyDataStreamOutput.
    *
-   * @param byteBufStreamOutput
+   * @param byteBufferStreamOutput the underlying ByteBufferStreamOutput
    */
-  public OzoneDataStreamOutput(ByteBufStreamOutput byteBufStreamOutput) {
-    this.byteBufStreamOutput = byteBufStreamOutput;
+  public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput) {
+    this.byteBufferStreamOutput = byteBufferStreamOutput;
   }
 
   @Override
-  public void write(ByteBuf b) throws IOException {
-    byteBufStreamOutput.write(b);
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    byteBufferStreamOutput.write(b, off, len);
   }
 
   @Override
   public synchronized void flush() throws IOException {
-    byteBufStreamOutput.flush();
+    byteBufferStreamOutput.flush();
   }
 
   @Override
   public synchronized void close() throws IOException {
     //commitKey can be done here, if needed.
-    byteBufStreamOutput.close();
+    byteBufferStreamOutput.close();
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    if (byteBufStreamOutput instanceof KeyDataStreamOutput) {
+    if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
       return ((KeyDataStreamOutput)
-              byteBufStreamOutput).getCommitUploadPartInfo();
+              byteBufferStreamOutput).getCommitUploadPartInfo();
     }
     // Otherwise return null.
     return null;
   }
 
-  public ByteBufStreamOutput getByteBufStreamOutput() {
-    return byteBufStreamOutput;
+  public ByteBufferStreamOutput getByteBufStreamOutput() {
+    return byteBufferStreamOutput;
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 4d52d89490..6d5401d651 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -17,7 +17,6 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
-import io.netty.buffer.Unpooled;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -38,6 +37,7 @@ import org.junit.Test;
 import org.junit.rules.Timeout;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -126,48 +126,38 @@ public class TestBlockDataStreamOutput {
     }
   }
 
+  @Test
+  public void testHalfChunkWrite() throws Exception {
+    testWrite(chunkSize / 2);
+  }
+
+  @Test
+  public void testSingleChunkWrite() throws Exception {
+    testWrite(chunkSize);
+  }
+
   @Test
   public void testMultiChunkWrite() throws Exception {
-    // write data less than 1 chunk size use streaming.
-    String keyName1 = getKeyName();
-    OzoneDataStreamOutput key1 = createKey(
-        keyName1, ReplicationType.RATIS, 0);
-    int dataLength1 = chunkSize/2;
-    byte[] data1 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength1)
-            .getBytes(UTF_8);
-    key1.write(Unpooled.copiedBuffer(data1));
-    // now close the stream, It will update the key length.
-    key1.close();
-    validateData(keyName1, data1);
-
-    // write data more than 1 chunk size use streaming.
-    String keyName2 = getKeyName();
-    OzoneDataStreamOutput key2 = createKey(
-        keyName2, ReplicationType.RATIS, 0);
-    int dataLength2 = chunkSize + 50;
-    byte[] data2 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength2)
-            .getBytes(UTF_8);
-    key2.write(Unpooled.copiedBuffer(data2));
-    // now close the stream, It will update the key length.
-    key2.close();
-    validateData(keyName2, data2);
-
-    // write data more than 1 block size use streaming.
-    String keyName3 = getKeyName();
-    OzoneDataStreamOutput key3 = createKey(
-        keyName3, ReplicationType.RATIS, 0);
-    int dataLength3 = blockSize + 50;
-    byte[] data3 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength3)
+    testWrite(chunkSize + 50);
+  }
+
+  @Test
+  public void testMultiBlockWrite() throws Exception {
+    testWrite(blockSize + 50);
+  }
+
+  private void testWrite(int dataLength) throws Exception {
+    String keyName = getKeyName();
+    OzoneDataStreamOutput key = createKey(
+        keyName, ReplicationType.RATIS, 0);
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
-    key3.write(Unpooled.copiedBuffer(data3));
+    key.write(ByteBuffer.wrap(data));
     // now close the stream, It will update the key length.
-    key3.close();
-    validateData(keyName3, data3);
+    key.close();
+    validateData(keyName, data);
   }
-
   private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
       long size) throws Exception {
     return TestHelper.createStreamKey(
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index e4b842eaca..c34783dbdd 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -29,8 +29,6 @@ import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Map;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.io.IOUtils;
@@ -117,10 +115,8 @@ public class PutKeyHandler extends KeyHandler {
         long off = 0;
         while (len > 0) {
           long writeLen = Math.min(len, chunkSize);
-          ByteBuffer segment =
-              ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
-          ByteBuf buf = Unpooled.wrappedBuffer(segment);
-          out.write(buf);
+          ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
+          out.write(bb);
           off += writeLen;
           len -= writeLen;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org