You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/04/22 06:59:13 UTC
[hadoop-ozone] branch master updated: HDDS-3155. Improved ozone
client flush implementation to make it faster. (#716)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2098516 HDDS-3155. Improved ozone client flush implementation to make it faster. (#716)
2098516 is described below
commit 2098516928378418b11f65cf404e3ecebbf46910
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Apr 22 14:59:02 2020 +0800
HDDS-3155. Improved ozone client flush implementation to make it faster. (#716)
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 15 +-
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 11 ++
.../common/src/main/resources/ozone-default.xml | 9 ++
.../ozone/client/io/BlockOutputStreamEntry.java | 36 ++++-
.../client/io/BlockOutputStreamEntryPool.java | 8 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 16 +-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 5 +
.../ozone/client/rpc/Test2BlockOutputStream.java | 173 +++++++++++++++++++++
8 files changed, 258 insertions(+), 15 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index ffabbf3..f15a5e6 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -85,7 +85,9 @@ public class BlockOutputStream extends OutputStream {
private final int bytesPerChecksum;
private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
+ private final int streamBufferSize;
private final long streamBufferFlushSize;
+ private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
@@ -131,10 +133,10 @@ public class BlockOutputStream extends OutputStream {
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
- long streamBufferFlushSize, long streamBufferMaxSize,
+ int streamBufferSize, long streamBufferFlushSize,
+ boolean streamBufferFlushDelay, long streamBufferMaxSize,
BufferPool bufferPool, ChecksumType checksumType,
- int bytesPerChecksum)
- throws IOException {
+ int bytesPerChecksum) throws IOException {
this.blockID = new AtomicReference<>(blockID);
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@@ -143,8 +145,10 @@ public class BlockOutputStream extends OutputStream {
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
+ this.streamBufferSize = streamBufferSize;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
+ this.streamBufferFlushDelay = streamBufferFlushDelay;
this.bufferPool = bufferPool;
this.bytesPerChecksum = bytesPerChecksum;
@@ -434,7 +438,9 @@ public class BlockOutputStream extends OutputStream {
@Override
public void flush() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
- && bufferPool != null && bufferPool.getSize() > 0) {
+ && bufferPool != null && bufferPool.getSize() > 0
+ && (!streamBufferFlushDelay ||
+ writtenDataLength - totalDataFlushedLength >= streamBufferSize)) {
try {
handleFlush(false);
} catch (InterruptedException | ExecutionException e) {
@@ -447,7 +453,6 @@ public class BlockOutputStream extends OutputStream {
}
}
-
private void writeChunk(ChunkBuffer buffer)
throws IOException {
// This data in the buffer will be pushed to datanode and a reference will
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 26fa6e2..fe1c440 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -149,6 +149,17 @@ public final class OzoneConfigKeys {
public static final TimeDuration OZONE_CLIENT_RETRY_INTERVAL_DEFAULT =
TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
+ /**
+ * If this value is true, when the client calls the flush() method,
+ * it checks whether the data in the buffer is greater than
+ * OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT. If greater than,
+ * send the data in the buffer to the datanode.
+ * */
+ public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY =
+ "ozone.client.stream.buffer.flush.delay";
+ public static final boolean OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
+ false;
+
// This defines the overall connection limit for the connection pool used in
// RestClient.
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 04a61cb..96a1128 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -387,6 +387,15 @@
</description>
</property>
<property>
+ <name>ozone.client.stream.buffer.flush.delay</name>
+ <value>false</value>
+ <tag>OZONE, CLIENT</tag>
+ <description>If set true, when call flush() and determine whether the
+ data in the current buffer is greater than ozone.client.stream.buffer.size.
+ if greater than then send buffer to the datanode.
+ </description>
+ </property>
+ <property>
<name>ozone.client.stream.buffer.size</name>
<value>4MB</value>
<tag>OZONE, CLIENT</tag>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 40a4a6b..8e1e640 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -55,7 +55,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
private long currentPosition;
private Token<OzoneBlockTokenIdentifier> token;
+ private final int streamBufferSize;
private final long streamBufferFlushSize;
+ private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final long watchTimeout;
private BufferPool bufferPool;
@@ -64,7 +66,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
Pipeline pipeline, String requestId, int chunkSize,
- long length, long streamBufferFlushSize, long streamBufferMaxSize,
+ long length, int streamBufferSize, long streamBufferFlushSize,
+ boolean streamBufferFlushDelay, long streamBufferMaxSize,
long watchTimeout, BufferPool bufferPool,
ChecksumType checksumType, int bytesPerChecksum,
Token<OzoneBlockTokenIdentifier> token) {
@@ -77,7 +80,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.token = token;
this.length = length;
this.currentPosition = 0;
+ this.streamBufferSize = streamBufferSize;
this.streamBufferFlushSize = streamBufferFlushSize;
+ this.streamBufferFlushDelay = streamBufferFlushDelay;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.bufferPool = bufferPool;
@@ -110,9 +115,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
- pipeline, streamBufferFlushSize,
- streamBufferMaxSize, bufferPool, checksumType,
- bytesPerChecksum);
+ pipeline, streamBufferSize, streamBufferFlushSize,
+ streamBufferFlushDelay, streamBufferMaxSize, bufferPool,
+ checksumType, bytesPerChecksum);
}
}
@@ -215,7 +220,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
private String requestId;
private int chunkSize;
private long length;
+ private int streamBufferSize;
private long streamBufferFlushSize;
+ private boolean streamBufferFlushDelay;
private long streamBufferMaxSize;
private long watchTimeout;
private BufferPool bufferPool;
@@ -269,11 +276,21 @@ public final class BlockOutputStreamEntry extends OutputStream {
return this;
}
+ public Builder setStreamBufferSize(int bufferSize) {
+ this.streamBufferSize = bufferSize;
+ return this;
+ }
+
public Builder setStreamBufferFlushSize(long bufferFlushSize) {
this.streamBufferFlushSize = bufferFlushSize;
return this;
}
+ public Builder setStreamBufferFlushDelay(boolean bufferFlushDelay) {
+ this.streamBufferFlushDelay = bufferFlushDelay;
+ return this;
+ }
+
public Builder setStreamBufferMaxSize(long bufferMaxSize) {
this.streamBufferMaxSize = bufferMaxSize;
return this;
@@ -297,7 +314,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, pipeline, requestId, chunkSize,
- length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
+ length, streamBufferSize, streamBufferFlushSize,
+ streamBufferFlushDelay, streamBufferMaxSize, watchTimeout,
bufferPool, checksumType, bytesPerChecksum, token);
}
}
@@ -331,10 +349,18 @@ public final class BlockOutputStreamEntry extends OutputStream {
return currentPosition;
}
+ public int getStreamBufferSize() {
+ return streamBufferSize;
+ }
+
public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}
+ public boolean getStreamBufferFlushDelay() {
+ return streamBufferFlushDelay;
+ }
+
public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 17683ad..3cab664 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -61,6 +61,7 @@ public class BlockOutputStreamEntryPool {
private final String requestID;
private final int streamBufferSize;
private final long streamBufferFlushSize;
+ private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final long watchTimeout;
private final long blockSize;
@@ -75,7 +76,8 @@ public class BlockOutputStreamEntryPool {
public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
HddsProtos.ReplicationType type,
- int bufferSize, long bufferFlushSize, long bufferMaxSize,
+ int bufferSize, long bufferFlushSize,
+ boolean bufferFlushDelay, long bufferMaxSize,
long size, long watchTimeout, ContainerProtos.ChecksumType checksumType,
int bytesPerChecksum, String uploadID, int partNumber,
boolean isMultipart, OmKeyInfo info,
@@ -93,6 +95,7 @@ public class BlockOutputStreamEntryPool {
this.requestID = requestId;
this.streamBufferSize = bufferSize;
this.streamBufferFlushSize = bufferFlushSize;
+ this.streamBufferFlushDelay = bufferFlushDelay;
this.streamBufferMaxSize = bufferMaxSize;
this.blockSize = size;
this.watchTimeout = watchTimeout;
@@ -137,6 +140,7 @@ public class BlockOutputStreamEntryPool {
requestID = null;
streamBufferSize = 0;
streamBufferFlushSize = 0;
+ streamBufferFlushDelay = false;
streamBufferMaxSize = 0;
bufferPool = new BufferPool(chunkSize, 1);
watchTimeout = 0;
@@ -188,7 +192,9 @@ public class BlockOutputStreamEntryPool {
.setRequestId(requestID)
.setChunkSize(chunkSize)
.setLength(subKeyInfo.getLength())
+ .setStreamBufferSize(streamBufferSize)
.setStreamBufferFlushSize(streamBufferFlushSize)
+ .setStreamBufferFlushDelay(streamBufferFlushDelay)
.setStreamBufferMaxSize(streamBufferMaxSize)
.setWatchTimeout(watchTimeout)
.setbufferPool(bufferPool)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index ba0ba12..843155c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -124,15 +124,16 @@ public class KeyOutputStream extends OutputStream {
XceiverClientManager xceiverClientManager,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
- int bufferSize, long bufferFlushSize, long bufferMaxSize,
- long size, long watchTimeout,
+ int bufferSize, long bufferFlushSize, boolean isBufferFlushDelay,
+ long bufferMaxSize, long size, long watchTimeout,
ChecksumType checksumType, int bytesPerChecksum,
String uploadID, int partNumber, boolean isMultipart,
int maxRetryCount, long retryInterval) {
OmKeyInfo info = handler.getKeyInfo();
blockOutputStreamEntryPool =
new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
- type, bufferSize, bufferFlushSize, bufferMaxSize, size,
+ type, bufferSize, bufferFlushSize, isBufferFlushDelay,
+ bufferMaxSize, size,
watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber,
isMultipart, info, xceiverClientManager, handler.getId());
// Retrieve the file encryption key info, null if file is not in
@@ -542,6 +543,7 @@ public class KeyOutputStream extends OutputStream {
private ReplicationFactor factor;
private int streamBufferSize;
private long streamBufferFlushSize;
+ private boolean streamBufferFlushDelay;
private long streamBufferMaxSize;
private long blockSize;
private long watchTimeout;
@@ -608,6 +610,11 @@ public class KeyOutputStream extends OutputStream {
return this;
}
+ public Builder setStreamBufferFlushDelay(boolean isDelay) {
+ this.streamBufferFlushDelay = isDelay;
+ return this;
+ }
+
public Builder setStreamBufferMaxSize(long size) {
this.streamBufferMaxSize = size;
return this;
@@ -646,7 +653,8 @@ public class KeyOutputStream extends OutputStream {
public KeyOutputStream build() {
return new KeyOutputStream(openHandler, xceiverManager, omClient,
chunkSize, requestID, factor, type,
- streamBufferSize, streamBufferFlushSize, streamBufferMaxSize,
+ streamBufferSize, streamBufferFlushSize, streamBufferFlushDelay,
+ streamBufferMaxSize,
blockSize, watchTimeout, checksumType,
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
maxRetryCount, retryInterval);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 1382117..532a7c5 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -136,6 +136,7 @@ public class RpcClient implements ClientProtocol {
private final ACLType groupRights;
private final int streamBufferSize;
private final long streamBufferFlushSize;
+ private boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final long blockSize;
private final ClientId clientId = ClientId.randomId();
@@ -196,6 +197,9 @@ public class RpcClient implements ClientProtocol {
.getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT,
StorageUnit.BYTES);
+ streamBufferFlushDelay = conf.getBoolean(
+ OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY,
+ OzoneConfigKeys.OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
streamBufferMaxSize = (long) conf
.getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT,
@@ -1198,6 +1202,7 @@ public class RpcClient implements ClientProtocol {
.setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
.setStreamBufferSize(streamBufferSize)
.setStreamBufferFlushSize(streamBufferFlushSize)
+ .setStreamBufferFlushDelay(streamBufferFlushDelay)
.setStreamBufferMaxSize(streamBufferMaxSize)
.setBlockSize(blockSize)
.setChecksumType(checksumType)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
new file mode 100644
index 0000000..3bb55fa
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
+
+/**
+ * Tests BlockOutputStream class.
+ */
+public class Test2BlockOutputStream {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf = new OzoneConfiguration();
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static int flushSize;
+ private static int maxFlushSize;
+ private static int blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ chunkSize = 100;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+ conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, true);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "testblockoutputstream";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ private String getKeyName() {
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFlushChunkDelay() throws Exception {
+ String keyName1 = getKeyName();
+ OzoneOutputStream key1 = createKey(keyName1, ReplicationType.RATIS, 0);
+
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, 10)
+ .getBytes(UTF_8);
+ key1.write(data1);
+ key1.flush();
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key1.getOutputStream();
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data(length 10) less than chunk Size,
+ // at this time we call flush will not sync data.
+ Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ key1.close();
+ validateData(keyName1, data1);
+
+ String keyName2 = getKeyName();
+ OzoneOutputStream key2 = createKey(keyName2, ReplicationType.RATIS, 0);
+ byte[] data2 =
+ ContainerTestHelper.getFixedLengthString(keyString, 110)
+ .getBytes(UTF_8);
+ key2.write(data2);
+ key2.flush();
+ keyOutputStream = (KeyOutputStream)key2.getOutputStream();
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data(length 110) greater than chunk Size,
+ // at this time we call flush will sync data.
+ Assert.assertEquals(data2.length,
+ blockOutputStream.getTotalDataFlushedLength());
+ key2.close();
+ validateData(keyName2, data2);
+ }
+
+ private OzoneOutputStream createKey(String keyName, ReplicationType type,
+ long size) throws Exception {
+ return TestHelper
+ .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+ }
+ private void validateData(String keyName, byte[] data) throws Exception {
+ TestHelper
+ .validateData(keyName, data, objectStore, volumeName, bucketName);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org