You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2021/11/15 04:46:41 UTC
[ozone] 07/13: HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit d1cb3f9e97c3315ed435314721de02e23fba2da0
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Wed Sep 8 09:25:17 2021 +0800
HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)
---
hadoop-hdds/client/pom.xml | 4 --
.../hdds/scm/storage/BlockDataStreamOutput.java | 26 +++++----
...reamOutput.java => ByteBufferStreamOutput.java} | 15 +++--
.../client/io/BlockDataStreamOutputEntry.java | 51 ++++++++---------
.../ozone/client/io/KeyDataStreamOutput.java | 15 +++--
.../ozone/client/io/OzoneDataStreamOutput.java | 30 +++++-----
.../client/rpc/TestBlockDataStreamOutput.java | 66 +++++++++-------------
.../hadoop/ozone/shell/keys/PutKeyHandler.java | 8 +--
8 files changed, 98 insertions(+), 117 deletions(-)
diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 482c067..4e75e42 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -66,10 +66,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<version>${spotbugs.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
</dependencies>
<build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 39ec2f9..d0419fa 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -46,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
/**
- * An {@link ByteBufStreamOutput} used by the REST service in combination
+ * An {@link ByteBufferStreamOutput} used by the REST service in combination
* with the SCMClient to write the value of a key to a sequence
* of container chunks. Writes are buffered locally and periodically written to
* the container as a new chunk. In order to preserve the semantics that
@@ -74,7 +74,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
* This class encapsulates all state management for buffering and writing
* through to the container.
*/
-public class BlockDataStreamOutput implements ByteBufStreamOutput {
+public class BlockDataStreamOutput implements ByteBufferStreamOutput {
public static final Logger LOG =
LoggerFactory.getLogger(BlockDataStreamOutput.class);
public static final String EXCEPTION_MSG =
@@ -209,16 +209,16 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
}
@Override
- public void write(ByteBuf buf) throws IOException {
+ public void write(ByteBuffer b, int off, int len) throws IOException {
checkOpen();
- if (buf == null) {
+ if (b == null) {
throw new NullPointerException();
}
- final int len = buf.readableBytes();
if (len == 0) {
return;
}
- writeChunkToContainer(buf);
+ writeChunkToContainer(
+ (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len));
writtenDataLength += len;
}
@@ -476,15 +476,17 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
* Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call.
*
+ * @param buf chunk data to write, from position to limit
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneChecksumException if there is an error while computing
* checksum
*/
- private void writeChunkToContainer(ByteBuf buf)
+ private void writeChunkToContainer(ByteBuffer buf)
throws IOException {
- ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer());
- int effectiveChunkSize = buf.readableBytes();
+ final int effectiveChunkSize = buf.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
+ ChecksumData checksumData =
+ checksum.computeChecksum(buf.asReadOnlyBuffer());
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
.setOffset(offset)
@@ -499,8 +501,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
CompletableFuture<DataStreamReply> future =
(needSync(offset + effectiveChunkSize) ?
- out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) :
- out.writeAsync(buf.nioBuffer()))
+ out.writeAsync(buf, StandardWriteOption.SYNC) :
+ out.writeAsync(buf))
.whenCompleteAsync((r, e) -> {
if (e != null || !r.isSuccess()) {
if (e == null) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
similarity index 82%
rename from hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
index 7f40737..0650a68 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
@@ -18,23 +18,24 @@
package org.apache.hadoop.hdds.scm.storage;
-import io.netty.buffer.ByteBuf;
-
import java.io.Closeable;
import java.io.IOException;
+import java.nio.ByteBuffer;
/**
* This interface is for writing an output stream of ByteBuffers.
-* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink.
+* An ByteBufferStreamOutput accepts nio ByteBuffer and sends them to some sink.
*/
-public interface ByteBufStreamOutput extends Closeable {
+public interface ByteBufferStreamOutput extends Closeable {
/**
* Try to write all the bytes in ByteBuf b to DataStream.
*
* @param b the data.
* @exception IOException if an I/O error occurs.
*/
- void write(ByteBuf b) throws IOException;
+ default void write(ByteBuffer b) throws IOException {
+ write(b, b.position(), b.remaining());
+ }
/**
* Try to write the [off:off + len) slice in ByteBuf b to DataStream.
@@ -44,9 +45,7 @@ public interface ByteBufStreamOutput extends Closeable {
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
- default void write(ByteBuf b, int off, int len) throws IOException {
- write(b.slice(off, len));
- }
+ void write(ByteBuffer b, int off, int len) throws IOException;
/**
* Flushes this DataStream output and forces any buffered output bytes
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index 98907bf..f0c3a43 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -18,18 +18,18 @@
package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.ByteBuf;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
@@ -37,10 +37,10 @@ import java.util.Collections;
* Helper class used inside {@link BlockDataStreamOutput}.
* */
public final class BlockDataStreamOutputEntry
- implements ByteBufStreamOutput {
+ implements ByteBufferStreamOutput {
private final OzoneClientConfig config;
- private ByteBufStreamOutput byteBufStreamOutput;
+ private ByteBufferStreamOutput byteBufferStreamOutput;
private BlockID blockID;
private final String key;
private final XceiverClientFactory xceiverClientManager;
@@ -61,7 +61,7 @@ public final class BlockDataStreamOutputEntry
OzoneClientConfig config
) {
this.config = config;
- this.byteBufStreamOutput = null;
+ this.byteBufferStreamOutput = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
@@ -90,63 +90,62 @@ public final class BlockDataStreamOutputEntry
* @throws IOException if xceiverClient initialization fails
*/
private void checkStream() throws IOException {
- if (this.byteBufStreamOutput == null) {
- this.byteBufStreamOutput =
+ if (this.byteBufferStreamOutput == null) {
+ this.byteBufferStreamOutput =
new BlockDataStreamOutput(blockID, xceiverClientManager,
pipeline, config, token);
}
}
@Override
- public void write(ByteBuf b) throws IOException {
+ public void write(ByteBuffer b, int off, int len) throws IOException {
checkStream();
- final int len = b.readableBytes();
- byteBufStreamOutput.write(b);
+ byteBufferStreamOutput.write(b, off, len);
this.currentPosition += len;
}
@Override
public void flush() throws IOException {
- if (this.byteBufStreamOutput != null) {
- this.byteBufStreamOutput.flush();
+ if (this.byteBufferStreamOutput != null) {
+ this.byteBufferStreamOutput.flush();
}
}
@Override
public void close() throws IOException {
- if (this.byteBufStreamOutput != null) {
- this.byteBufStreamOutput.close();
+ if (this.byteBufferStreamOutput != null) {
+ this.byteBufferStreamOutput.close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
this.blockID =
- ((BlockDataStreamOutput) byteBufStreamOutput).getBlockID();
+ ((BlockDataStreamOutput) byteBufferStreamOutput).getBlockID();
}
}
boolean isClosed() {
- if (byteBufStreamOutput != null) {
- return ((BlockDataStreamOutput) byteBufStreamOutput).isClosed();
+ if (byteBufferStreamOutput != null) {
+ return ((BlockDataStreamOutput) byteBufferStreamOutput).isClosed();
}
return false;
}
Collection<DatanodeDetails> getFailedServers() {
- if (byteBufStreamOutput != null) {
+ if (byteBufferStreamOutput != null) {
BlockDataStreamOutput out =
- (BlockDataStreamOutput) this.byteBufStreamOutput;
+ (BlockDataStreamOutput) this.byteBufferStreamOutput;
return out.getFailedServers();
}
return Collections.emptyList();
}
long getWrittenDataLength() {
- if (byteBufStreamOutput != null) {
+ if (byteBufferStreamOutput != null) {
BlockDataStreamOutput out =
- (BlockDataStreamOutput) this.byteBufStreamOutput;
+ (BlockDataStreamOutput) this.byteBufferStreamOutput;
return out.getWrittenDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
- // the ByteBufStreamOutput will be null here.
+ // the ByteBufferStreamOutput will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
@@ -155,7 +154,7 @@ public final class BlockDataStreamOutputEntry
void cleanup(boolean invalidateClient) throws IOException {
checkStream();
BlockDataStreamOutput out =
- (BlockDataStreamOutput) this.byteBufStreamOutput;
+ (BlockDataStreamOutput) this.byteBufferStreamOutput;
out.cleanup(invalidateClient);
}
@@ -163,7 +162,7 @@ public final class BlockDataStreamOutputEntry
void writeOnRetry(long len) throws IOException {
checkStream();
BlockDataStreamOutput out =
- (BlockDataStreamOutput) this.byteBufStreamOutput;
+ (BlockDataStreamOutput) this.byteBufferStreamOutput;
out.writeOnRetry(len);
this.currentPosition += len;
@@ -231,8 +230,8 @@ public final class BlockDataStreamOutputEntry
}
@VisibleForTesting
- public ByteBufStreamOutput getByteBufStreamOutput() {
- return byteBufStreamOutput;
+ public ByteBufferStreamOutput getByteBufStreamOutput() {
+ return byteBufferStreamOutput;
}
public BlockID getBlockID() {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index c37f9cd..9bba89d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -32,7 +31,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -48,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -63,7 +63,7 @@ import java.util.stream.Collectors;
*
* TODO : currently not support multi-thread access.
*/
-public class KeyDataStreamOutput implements ByteBufStreamOutput {
+public class KeyDataStreamOutput implements ByteBufferStreamOutput {
private OzoneClientConfig config;
@@ -185,17 +185,16 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
}
@Override
- public void write(ByteBuf b) throws IOException {
+ public void write(ByteBuffer b, int off, int len) throws IOException {
checkNotClosed();
if (b == null) {
throw new NullPointerException();
}
- final int len = b.readableBytes();
- handleWrite(b, b.readerIndex(), len, false);
+ handleWrite(b, off, len, false);
writeOffset += len;
}
- private void handleWrite(ByteBuf b, int off, long len, boolean retry)
+ private void handleWrite(ByteBuffer b, int off, long len, boolean retry)
throws IOException {
while (len > 0) {
try {
@@ -227,7 +226,7 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
}
private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
- boolean retry, long len, ByteBuf b, int writeLen, int off,
+ boolean retry, long len, ByteBuffer b, int writeLen, int off,
long currentPos) throws IOException {
try {
if (retry) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index 378b868..d40ac2b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -16,55 +16,55 @@
*/
package org.apache.hadoop.ozone.client.io;
-import io.netty.buffer.ByteBuf;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import java.io.IOException;
+import java.nio.ByteBuffer;
/**
* OzoneDataStreamOutput is used to write data into Ozone.
* It uses SCM's {@link KeyDataStreamOutput} for writing the data.
*/
-public class OzoneDataStreamOutput implements ByteBufStreamOutput {
+public class OzoneDataStreamOutput implements ByteBufferStreamOutput {
- private final ByteBufStreamOutput byteBufStreamOutput;
+ private final ByteBufferStreamOutput byteBufferStreamOutput;
/**
* Constructs OzoneDataStreamOutput with KeyDataStreamOutput.
*
- * @param byteBufStreamOutput
+ * @param byteBufferStreamOutput the underlying ByteBufferStreamOutput
*/
- public OzoneDataStreamOutput(ByteBufStreamOutput byteBufStreamOutput) {
- this.byteBufStreamOutput = byteBufStreamOutput;
+ public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput) {
+ this.byteBufferStreamOutput = byteBufferStreamOutput;
}
@Override
- public void write(ByteBuf b) throws IOException {
- byteBufStreamOutput.write(b);
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ byteBufferStreamOutput.write(b, off, len);
}
@Override
public synchronized void flush() throws IOException {
- byteBufStreamOutput.flush();
+ byteBufferStreamOutput.flush();
}
@Override
public synchronized void close() throws IOException {
//commitKey can be done here, if needed.
- byteBufStreamOutput.close();
+ byteBufferStreamOutput.close();
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
- if (byteBufStreamOutput instanceof KeyDataStreamOutput) {
+ if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
return ((KeyDataStreamOutput)
- byteBufStreamOutput).getCommitUploadPartInfo();
+ byteBufferStreamOutput).getCommitUploadPartInfo();
}
// Otherwise return null.
return null;
}
- public ByteBufStreamOutput getByteBufStreamOutput() {
- return byteBufStreamOutput;
+ public ByteBufferStreamOutput getByteBufStreamOutput() {
+ return byteBufferStreamOutput;
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 4d52d89..6d5401d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.ozone.client.rpc;
-import io.netty.buffer.Unpooled;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -38,6 +37,7 @@ import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -127,47 +127,37 @@ public class TestBlockDataStreamOutput {
}
@Test
+ public void testHalfChunkWrite() throws Exception {
+ testWrite(chunkSize / 2);
+ }
+
+ @Test
+ public void testSingleChunkWrite() throws Exception {
+ testWrite(chunkSize);
+ }
+
+ @Test
public void testMultiChunkWrite() throws Exception {
- // write data less than 1 chunk size use streaming.
- String keyName1 = getKeyName();
- OzoneDataStreamOutput key1 = createKey(
- keyName1, ReplicationType.RATIS, 0);
- int dataLength1 = chunkSize/2;
- byte[] data1 =
- ContainerTestHelper.getFixedLengthString(keyString, dataLength1)
- .getBytes(UTF_8);
- key1.write(Unpooled.copiedBuffer(data1));
- // now close the stream, It will update the key length.
- key1.close();
- validateData(keyName1, data1);
-
- // write data more than 1 chunk size use streaming.
- String keyName2 = getKeyName();
- OzoneDataStreamOutput key2 = createKey(
- keyName2, ReplicationType.RATIS, 0);
- int dataLength2 = chunkSize + 50;
- byte[] data2 =
- ContainerTestHelper.getFixedLengthString(keyString, dataLength2)
- .getBytes(UTF_8);
- key2.write(Unpooled.copiedBuffer(data2));
- // now close the stream, It will update the key length.
- key2.close();
- validateData(keyName2, data2);
-
- // write data more than 1 block size use streaming.
- String keyName3 = getKeyName();
- OzoneDataStreamOutput key3 = createKey(
- keyName3, ReplicationType.RATIS, 0);
- int dataLength3 = blockSize + 50;
- byte[] data3 =
- ContainerTestHelper.getFixedLengthString(keyString, dataLength3)
+ testWrite(chunkSize + 50);
+ }
+
+ @Test
+ public void testMultiBlockWrite() throws Exception {
+ testWrite(blockSize + 50);
+ }
+
+ private void testWrite(int dataLength) throws Exception {
+ String keyName = getKeyName();
+ OzoneDataStreamOutput key = createKey(
+ keyName, ReplicationType.RATIS, 0);
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
- key3.write(Unpooled.copiedBuffer(data3));
+ key.write(ByteBuffer.wrap(data));
// now close the stream, It will update the key length.
- key3.close();
- validateData(keyName3, data3);
+ key.close();
+ validateData(keyName, data);
}
-
private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
long size) throws Exception {
return TestHelper.createStreamKey(
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index 56bc834..af6a461 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -29,8 +29,6 @@ import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfigValidator;
@@ -141,10 +139,8 @@ public class PutKeyHandler extends KeyHandler {
long off = 0;
while (len > 0) {
long writeLen = Math.min(len, chunkSize);
- ByteBuffer segment =
- ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
- ByteBuf buf = Unpooled.wrappedBuffer(segment);
- out.write(buf);
+ ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
+ out.write(bb);
off += writeLen;
len -= writeLen;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org