You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/06/03 04:26:54 UTC
[hadoop-ozone] branch master updated: HDDS-3477. Disable partial
chunk write during flush() call in ozone client by default. (#957)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e4f23ee HDDS-3477. Disable partial chunk write during flush() call in ozone client by default. (#957)
e4f23ee is described below
commit e4f23ee243a12354e326a959f40aab38e48dbce0
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Jun 3 12:26:40 2020 +0800
HDDS-3477. Disable partial chunk write during flush() call in ozone client by default. (#957)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 4 +-
.../common/src/main/resources/ozone-default.xml | 10 +-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 2 +-
.../ozone/client/rpc/TestBlockOutputStream.java | 2 +
...m.java => TestBlockOutputStreamFlushDelay.java} | 62 +++---
.../rpc/TestBlockOutputStreamWithFailures.java | 2 +
...stBlockOutputStreamWithFailuresFlushDelay.java} | 33 ++-
.../client/rpc/TestContainerStateMachine.java | 2 +
...va => TestContainerStateMachineFlushDelay.java} | 107 +++-------
.../client/rpc/TestFailureHandlingByClient.java | 3 +-
.../rpc/TestFailureHandlingByClientFlushDelay.java | 235 +++++++++++++++++++++
.../rpc/TestOzoneClientRetriesOnException.java | 2 +
...stOzoneClientRetriesOnExceptionFlushDelay.java} | 148 ++++++-------
13 files changed, 413 insertions(+), 199 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 841ffc5..281f185 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -157,8 +157,8 @@ public final class OzoneConfigKeys {
* */
public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY =
"ozone.client.stream.buffer.flush.delay";
- public static final boolean OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
- false;
+ public static final boolean OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
+ true;
// This defines the overall connection limit for the connection pool used in
// RestClient.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 26bf2ba..93292d8 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -390,11 +390,13 @@
</property>
<property>
<name>ozone.client.stream.buffer.flush.delay</name>
- <value>false</value>
+ <value>true</value>
<tag>OZONE, CLIENT</tag>
- <description>If set true, when call flush() and determine whether the
- data in the current buffer is greater than ozone.client.stream.buffer.size.
- if greater than then send buffer to the datanode.
+ <description>
+ Default true, when call flush() and determine whether the data in the
+ current buffer is greater than ozone.client.stream.buffer.size, if
+ greater than then send buffer to the datanode. You can turn this off
+ by setting this configuration to false.
</description>
</property>
<property>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 57dfb0d..3c333bd 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -202,7 +202,7 @@ public class RpcClient implements ClientProtocol {
StorageUnit.BYTES);
streamBufferFlushDelay = conf.getBoolean(
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY,
- OzoneConfigKeys.OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
+ OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
streamBufferMaxSize = (long) conf
.getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 338e286..44b47dc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -48,6 +48,7 @@ import org.junit.rules.Timeout;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests BlockOutputStream class.
@@ -90,6 +91,7 @@ public class TestBlockOutputStream {
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
+ conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setTotalPipelineNumLimit(10)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
similarity index 95%
copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
index 338e286..93a3ad6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
@@ -50,13 +50,13 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
- * Tests BlockOutputStream class.
+ * Tests TestBlockOutputStreamFlushDelay class.
*/
-public class TestBlockOutputStream {
+public class TestBlockOutputStreamFlushDelay {
/**
- * Set a timeout for each test.
- */
+ * Set a timeout for each test.
+ */
@Rule
public Timeout timeout = new Timeout(300000);
private static MiniOzoneCluster cluster;
@@ -140,6 +140,7 @@ public class TestBlockOutputStream {
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = 50;
+ int totalWriteDataLength = dataLength * 2;
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
@@ -174,7 +175,9 @@ public class TestBlockOutputStream {
// commitIndex2FlushedData Map will be empty here
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
-
+ // Total write data greater than or equal one chunk
+ // size to make sure flush will sync data.
+ key.write(data1);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
@@ -190,14 +193,16 @@ public class TestBlockOutputStream {
Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(0,
blockOutputStream.getBufferPool().getBuffer(0).position());
- Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- Assert.assertEquals(dataLength,
+ Assert.assertEquals(totalWriteDataLength,
+ blockOutputStream.getWrittenDataLength());
+ Assert.assertEquals(totalWriteDataLength,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
// flush ensures watchForCommit updates the total length acknowledged
- Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertEquals(totalWriteDataLength,
+ blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit
@@ -217,7 +222,8 @@ public class TestBlockOutputStream {
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertEquals(totalWriteDataLength,
+ blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
@@ -271,31 +277,31 @@ public class TestBlockOutputStream {
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
- // Now do a flush. This will flush the data and update the flush length and
- // the map.
+ // Now do a flush.
key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
- // flush is a sync call, all pending operations will complete
- Assert.assertEquals(pendingWriteChunkCount, metrics
+ // The previously written data is equal to flushSize,so no action is
+ // triggered when execute flush.
+ Assert.assertEquals(pendingWriteChunkCount + 2, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount, metrics
+ Assert.assertEquals(pendingPutBlockCount + 1, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
-
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
- Assert
- .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ // No action is triggered when execute flush and BlockOutputStream will not
+ // be updated.
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
+ Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
- // flush ensures watchForCommit updates the total length acknowledged
- Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
@@ -650,8 +656,7 @@ public class TestBlockOutputStream {
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
- // Now do a flush. This will flush the data and update the flush length and
- // the map.
+ // Now do a flush.
key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, metrics
@@ -664,10 +669,10 @@ public class TestBlockOutputStream {
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- Assert.assertEquals(dataLength,
+ // dataLength = maxFlushSize + 50, the final data length of 50 is smaller
+ // than the chunkSize(100) and will not be sync when called flush.
+ Assert.assertEquals(dataLength - 50,
blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
@@ -683,9 +688,10 @@ public class TestBlockOutputStream {
.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 4,
+ // The previous flush did not trigger any action.
+ Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 9,
+ Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
@@ -694,7 +700,7 @@ public class TestBlockOutputStream {
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
- long size) throws Exception {
+ long size) throws Exception {
return TestHelper
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 7e327bc..e377615 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -56,6 +56,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests failure detection and handling in BlockOutputStream Class.
@@ -119,6 +120,7 @@ public class TestBlockOutputStreamWithFailures {
"watch.request.timeout",
3, TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 15);
+ conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
.setTotalPipelineNumLimit(10).setBlockSize(blockSize)
.setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
similarity index 98%
copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
index 7e327bc..6a849ac 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -38,29 +39,23 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.junit.Rule;
-import org.junit.rules.Timeout;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
- * Tests failure detection and handling in BlockOutputStream Class.
+ * Tests failure detection by set flush delay and handling in
+ * BlockOutputStream Class.
*/
-public class TestBlockOutputStreamWithFailures {
+public class TestBlockOutputStreamWithFailuresFlushDelay {
/**
* Set a timeout for each test.
@@ -156,7 +151,7 @@ public class TestBlockOutputStreamWithFailures {
throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
- int dataLength = maxFlushSize + 50;
+ int dataLength = maxFlushSize + chunkSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -245,7 +240,7 @@ public class TestBlockOutputStreamWithFailures {
public void testWatchForCommitDatanodeFailure() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
- int dataLength = maxFlushSize + 50;
+ int dataLength = maxFlushSize + chunkSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -332,7 +327,7 @@ public class TestBlockOutputStreamWithFailures {
public void test2DatanodesFailure() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
- int dataLength = maxFlushSize + 50;
+ int dataLength = maxFlushSize + chunkSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -437,7 +432,7 @@ public class TestBlockOutputStreamWithFailures {
public void testFailureWithPrimeSizedData() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
- int dataLength = maxFlushSize + 69;
+ int dataLength = maxFlushSize + chunkSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -573,7 +568,7 @@ public class TestBlockOutputStreamWithFailures {
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
- int dataLength = maxFlushSize + 50;
+ int dataLength = maxFlushSize + chunkSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -664,7 +659,7 @@ public class TestBlockOutputStreamWithFailures {
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
- int dataLength = maxFlushSize + 50;
+ int dataLength = maxFlushSize + chunkSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -758,7 +753,7 @@ public class TestBlockOutputStreamWithFailures {
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 3 * blockSize,
ReplicationFactor.ONE);
- int dataLength = maxFlushSize + 50;
+ int dataLength = maxFlushSize + chunkSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 4207006..1972dac 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -55,6 +55,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests the containerStateMachine failure handling.
@@ -98,6 +99,7 @@ public class TestContainerStateMachine {
conf.setQuietMode(false);
OzoneManager.setTestSecureOmFlag(true);
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
+ conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
// conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
similarity index 61%
copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
index 4207006..1208652 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.client.rpc;
+import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
@@ -30,36 +31,29 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.RatisServerConfiguration;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
+import org.junit.rules.Timeout;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.junit.Rule;
-import org.junit.rules.Timeout;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.HddsConfigKeys.*;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
- * Tests the containerStateMachine failure handling.
+ * Tests the containerStateMachine failure handling by set flush delay.
*/
-public class TestContainerStateMachine {
+public class TestContainerStateMachineFlushDelay {
/**
* Set a timeout for each test.
@@ -74,6 +68,11 @@ public class TestContainerStateMachine {
private String volumeName;
private String bucketName;
private String path;
+ private static int chunkSize;
+ private static int flushSize;
+ private static int maxFlushSize;
+ private static int blockSize;
+ private static String keyString;
/**
* Create a MiniDFSCluster for testing.
@@ -82,8 +81,13 @@ public class TestContainerStateMachine {
*/
@Before
public void setup() throws Exception {
+ chunkSize = 100;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+ keyString = UUID.randomUUID().toString();
path = GenericTestUtils
- .getTempPath(TestContainerStateMachine.class.getSimpleName());
+ .getTempPath(TestContainerStateMachineFlushDelay.class.getSimpleName());
File baseDir = new File(path);
baseDir.mkdirs();
@@ -101,6 +105,11 @@ public class TestContainerStateMachine {
// conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
.setHbInterval(200)
.setCertificateClient(new CertificateClientTestImpl(conf))
.build();
@@ -131,8 +140,14 @@ public class TestContainerStateMachine {
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
+ // Now ozone.client.stream.buffer.flush.delay is currently enabled
+ // by default. Here we written data(length 110) greater than chunk
+ // Size(length 100), make sure flush will sync data.
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, 110)
+ .getBytes(UTF_8);
// First write and flush creates a container in the datanode
- key.write("ratis".getBytes());
+ key.write(data);
key.flush();
key.write("ratis".getBytes());
@@ -162,58 +177,4 @@ public class TestContainerStateMachine {
== ContainerProtos.ContainerDataProto.State.UNHEALTHY);
}
- @Test
- public void testRatisSnapshotRetention() throws Exception {
-
- ContainerStateMachine stateMachine =
- (ContainerStateMachine) TestHelper.getStateMachine(cluster);
- SimpleStateMachineStorage storage =
- (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
- Assert.assertNull(storage.findLatestSnapshot());
-
- // Write 10 keys. Num snapshots should be equal to config value.
- for (int i = 1; i <= 10; i++) {
- OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey(("ratis" + i), 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
- // First write and flush creates a container in the datanode
- key.write(("ratis" + i).getBytes());
- key.flush();
- key.write(("ratis" + i).getBytes());
- key.close();
- }
-
- RatisServerConfiguration ratisServerConfiguration =
- conf.getObject(RatisServerConfiguration.class);
-
- stateMachine =
- (ContainerStateMachine) TestHelper.getStateMachine(cluster);
- storage = (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
- Path parentPath = storage.findLatestSnapshot().getFile().getPath();
- int numSnapshots = parentPath.getParent().toFile().listFiles().length;
- Assert.assertTrue(Math.abs(ratisServerConfiguration
- .getNumSnapshotsRetained() - numSnapshots) <= 1);
-
- // Write 10 more keys. Num Snapshots should remain the same.
- for (int i = 11; i <= 20; i++) {
- OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey(("ratis" + i), 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
- // First write and flush creates a container in the datanode
- key.write(("ratis" + i).getBytes());
- key.flush();
- key.write(("ratis" + i).getBytes());
- key.close();
- }
- stateMachine =
- (ContainerStateMachine) TestHelper.getStateMachine(cluster);
- storage = (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
- parentPath = storage.findLatestSnapshot().getFile().getPath();
- numSnapshots = parentPath.getParent().toFile().listFiles().length;
- Assert.assertTrue(Math.abs(ratisServerConfiguration
- .getNumSnapshotsRetained() - numSnapshots) <= 1);
- }
-
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index d172dd5..2edf369 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -123,7 +123,8 @@ public class TestFailureHandlingByClient {
RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
"watch.request.timeout",
3, TimeUnit.SECONDS);
-
+ conf.setBoolean(
+ OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
new file mode 100644
index 0000000..95e275b
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.junit.*;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests Exception handling by Ozone Client by set flush delay.
+ */
+public class TestFailureHandlingByClientFlushDelay {
+
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public Timeout timeout = new Timeout(300000);
+
+ private MiniOzoneCluster cluster;
+ private OzoneConfiguration conf;
+ private OzoneClient client;
+ private ObjectStore objectStore;
+ private int chunkSize;
+ private int flushSize;
+ private int maxFlushSize;
+ private int blockSize;
+ private String volumeName;
+ private String bucketName;
+ private String keyString;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ private void init() throws Exception {
+ conf = new OzoneConfiguration();
+ chunkSize = 100;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 4 * chunkSize;
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
+ conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+ conf.setTimeDuration(
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
+ 1, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ 1, TimeUnit.SECONDS);
+ conf.setBoolean(
+ OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.
+ RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+ "rpc.request.timeout",
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+ "watch.request.timeout",
+ 3, TimeUnit.SECONDS);
+ conf.setQuietMode(false);
+ conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ StaticMapping.class, DNSToSwitchMapping.class);
+ StaticMapping.addNodeToRack(NetUtils.normalizeHostNames(
+ Collections.singleton(HddsUtils.getHostName(conf))).get(0),
+ "/rack1");
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(10)
+ .setTotalPipelineNumLimit(15)
+ .setChunkSize(chunkSize)
+ .setBlockSize(blockSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "datanodefailurehandlingtest";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ private void startCluster() throws Exception {
+ init();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testPipelineExclusionWithPipelineFailure() throws Exception {
+ startCluster();
+ String keyName = UUID.randomUUID().toString();
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.RATIS, blockSize);
+ String data = ContainerTestHelper
+ .getFixedLengthString(keyString, chunkSize);
+
+ // get the name of a valid container
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream =
+ (KeyOutputStream) key.getOutputStream();
+ List<BlockOutputStreamEntry> streamEntryList =
+ keyOutputStream.getStreamEntries();
+
+ // Assert that 1 block will be preallocated
+ Assert.assertEquals(1, streamEntryList.size());
+ key.write(data.getBytes());
+ key.flush();
+ long containerId = streamEntryList.get(0).getBlockID().getContainerID();
+ BlockID blockId = streamEntryList.get(0).getBlockID();
+ ContainerInfo container =
+ cluster.getStorageContainerManager().getContainerManager()
+ .getContainer(ContainerID.valueof(containerId));
+ Pipeline pipeline =
+ cluster.getStorageContainerManager().getPipelineManager()
+ .getPipeline(container.getPipelineID());
+ List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+ // Two nodes, next write will hit AlreadyClosedException , the pipeline
+ // will be added in the exclude list
+ cluster.shutdownHddsDatanode(datanodes.get(0));
+ cluster.shutdownHddsDatanode(datanodes.get(1));
+
+ key.write(data.getBytes());
+ key.flush();
+ Assert.assertTrue(
+ keyOutputStream.getExcludeList().getContainerIds().isEmpty());
+ Assert.assertTrue(
+ keyOutputStream.getExcludeList().getDatanodes().isEmpty());
+ Assert.assertTrue(
+ keyOutputStream.getExcludeList().getDatanodes().isEmpty());
+ key.write(data.getBytes());
+ // The close will just write to the buffer
+ key.close();
+
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+ .setRefreshPipeline(true)
+ .build();
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
+ // Make sure a new block is written
+ Assert.assertNotEquals(
+ keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
+ .getBlockID(), blockId);
+ Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize());
+ validateData(keyName, data.concat(data).concat(data).getBytes());
+ }
+
+ private OzoneOutputStream createKey(String keyName, ReplicationType type,
+ long size) throws Exception {
+ return TestHelper
+ .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+ }
+
+ private void validateData(String keyName, byte[] data) throws Exception {
+ TestHelper
+ .validateData(keyName, data, objectStore, volumeName, bucketName);
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 8f0ce00..d93d817 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -57,6 +57,7 @@ import org.junit.Rule;
import org.junit.rules.Timeout;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests failure detection and handling in BlockOutputStream Class.
@@ -100,6 +101,7 @@ public class TestOzoneClientRetriesOnException {
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
conf.setQuietMode(false);
+ conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setTotalPipelineNumLimit(10)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
similarity index 54%
rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
index cf719b2..b9029ea 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
@@ -14,12 +14,19 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -30,44 +37,42 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.junit.*;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.junit.Rule;
-import org.junit.rules.Timeout;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
- * Tests BlockOutputStream class.
+ * Tests failure detection and handling in BlockOutputStream Class by set
+ * flush delay.
*/
-public class Test2BlockOutputStream {
+public class TestOzoneClientRetriesOnExceptionFlushDelay {
/**
- * Set a timeout for each test.
- */
+ * Set a timeout for each test.
+ */
@Rule
public Timeout timeout = new Timeout(300000);
+
private static MiniOzoneCluster cluster;
- private static OzoneConfiguration conf = new OzoneConfiguration();
- private static OzoneClient client;
- private static ObjectStore objectStore;
- private static int chunkSize;
- private static int flushSize;
- private static int maxFlushSize;
- private static int blockSize;
- private static String volumeName;
- private static String bucketName;
- private static String keyString;
+ private OzoneConfiguration conf = new OzoneConfiguration();
+ private OzoneClient client;
+ private ObjectStore objectStore;
+ private int chunkSize;
+ private int flushSize;
+ private int maxFlushSize;
+ private int blockSize;
+ private String volumeName;
+ private String bucketName;
+ private String keyString;
+ private XceiverClientManager xceiverClientManager;
/**
* Create a MiniDFSCluster for testing.
@@ -76,19 +81,18 @@ public class Test2BlockOutputStream {
*
* @throws IOException
*/
- @BeforeClass
- public static void init() throws Exception {
+ @Before
+ public void init() throws Exception {
chunkSize = 100;
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
- conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000,
+ TimeUnit.MILLISECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+ conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
conf.setQuietMode(false);
- conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
- StorageUnit.MB);
- conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, true);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setTotalPipelineNumLimit(10)
@@ -102,8 +106,9 @@ public class Test2BlockOutputStream {
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getRpcClient(conf);
objectStore = client.getObjectStore();
+ xceiverClientManager = new XceiverClientManager(conf);
keyString = UUID.randomUUID().toString();
- volumeName = "testblockoutputstream";
+ volumeName = "testblockoutputstreamwithretries";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
@@ -116,66 +121,67 @@ public class Test2BlockOutputStream {
/**
* Shutdown MiniDFSCluster.
*/
- @AfterClass
- public static void shutdown() {
+ @After
+ public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
- public void testFlushChunkDelay() throws Exception {
- String keyName1 = getKeyName();
- OzoneOutputStream key1 = createKey(keyName1, ReplicationType.RATIS, 0);
-
+ public void testGroupMismatchExceptionHandling() throws Exception {
+ String keyName = getKeyName();
+ // make sure flush will sync data.
+ int dataLength = maxFlushSize + chunkSize;
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS,
+ dataLength);
+ // write data more than 1 chunk
byte[] data1 =
- ContainerTestHelper.getFixedLengthString(keyString, 10)
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
- key1.write(data1);
- key1.flush();
- KeyOutputStream keyOutputStream = (KeyOutputStream)key1.getOutputStream();
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+ long containerID =
+ keyOutputStream.getStreamEntries().get(0).
+ getBlockID().getContainerID();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ ContainerInfo container =
+ cluster.getStorageContainerManager().getContainerManager()
+ .getContainer(ContainerID.valueof(containerID));
+ Pipeline pipeline =
+ cluster.getStorageContainerManager().getPipelineManager()
+ .getPipeline(container.getPipelineID());
+ XceiverClientSpi xceiverClient =
+ xceiverClientManager.acquireClient(pipeline);
+ xceiverClient.sendCommand(ContainerTestHelper
+ .getCreateContainerRequest(containerID, pipeline));
+ xceiverClientManager.releaseClient(xceiverClient, false);
+ key.write(data1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
-
- // we have just written data(length 10) less than chunk Size,
- // at this time we call flush will not sync data.
- Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
- key1.close();
- validateData(keyName1, data1);
-
- String keyName2 = getKeyName();
- OzoneOutputStream key2 = createKey(keyName2, ReplicationType.RATIS, 0);
- byte[] data2 =
- ContainerTestHelper.getFixedLengthString(keyString, 110)
- .getBytes(UTF_8);
- key2.write(data2);
- key2.flush();
- keyOutputStream = (KeyOutputStream)key2.getOutputStream();
- Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
- stream = keyOutputStream.getStreamEntries().get(0)
- .getOutputStream();
- Assert.assertTrue(stream instanceof BlockOutputStream);
- blockOutputStream = (BlockOutputStream) stream;
-
- // we have just written data(length 110) greater than chunk Size,
- // at this time we call flush will sync data.
- Assert.assertEquals(data2.length,
- blockOutputStream.getTotalDataFlushedLength());
- key2.close();
- validateData(keyName2, data2);
+ TestHelper.waitForPipelineClose(key, cluster, false);
+ key.flush();
+ Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+ .getIoException()) instanceof GroupMismatchException);
+ Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
+ .contains(pipeline.getId()));
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+ key.close();
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+ validateData(keyName, data1);
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
- long size) throws Exception {
+ long size) throws Exception {
return TestHelper
- .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+ .createKey(keyName, type, ReplicationFactor.ONE,
+ size, objectStore, volumeName, bucketName);
}
+
private void validateData(String keyName, byte[] data) throws Exception {
TestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName);
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org