You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/01 10:50:33 UTC
[hadoop] branch ozone-0.4 updated: HDDS-1312. Add more unit tests
to verify BlockOutputStream functionalities. Contributed by Shashikant
Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/ozone-0.4 by this push:
new bb20c80 HDDS-1312. Add more unit tests to verify BlockOutputStream functionalities. Contributed by Shashikant Banerjee.
bb20c80 is described below
commit bb20c80955d1fd69a522653d7edbdf86f5eb3e7d
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Apr 1 16:09:58 2019 +0530
HDDS-1312. Add more unit tests to verify BlockOutputStream functionalities. Contributed by Shashikant Banerjee.
(cherry picked from commit ef5de292436db56547f05f42a745be32e2b38546)
---
.../hadoop/ozone/client/io/KeyOutputStream.java | 72 ++-
.../rpc/TestBlockOutputStreamWithFailures.java | 696 ++++++++++++++++++++-
.../ozone/container/ContainerTestHelper.java | 9 +
3 files changed, 750 insertions(+), 27 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 78f03d2..c04105c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -96,6 +96,7 @@ public class KeyOutputStream extends OutputStream {
private ExcludeList excludeList;
private final RetryPolicy retryPolicy;
private int retryCount;
+ private long offset;
/**
* A constructor for testing purpose only.
*/
@@ -121,6 +122,7 @@ public class KeyOutputStream extends OutputStream {
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
retryCount = 0;
+ offset = 0;
}
@VisibleForTesting
@@ -149,6 +151,10 @@ public class KeyOutputStream extends OutputStream {
return locationInfoList;
}
+ @VisibleForTesting
+ public int getRetryCount() {
+ return retryCount;
+ }
@SuppressWarnings("parameternumber")
public KeyOutputStream(OpenKeySession handler,
@@ -316,6 +322,7 @@ public class KeyOutputStream extends OutputStream {
current.writeOnRetry(len);
} else {
current.write(b, off, writeLen);
+ offset += writeLen;
}
} catch (IOException ioe) {
// for the current iteration, totalDataWritten - currentPos gives the
@@ -326,8 +333,12 @@ public class KeyOutputStream extends OutputStream {
// The len specified here is the combined sum of the data length of
// the buffers
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
- writeLen = retry ? (int) len :
- (int) (current.getWrittenDataLength() - currentPos);
+ int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+ writeLen = retry ? (int) len : dataWritten;
+ // In retry path, the data written is already accounted in offset.
+ if (!retry) {
+ offset += writeLen;
+ }
LOG.debug("writeLen {}, total len {}", writeLen, len);
handleException(current, currentStreamIndex, ioe);
}
@@ -345,20 +356,24 @@ public class KeyOutputStream extends OutputStream {
* from the streamEntries list for the container which is closed.
* @param containerID id of the closed container
* @param pipelineId id of the associated pipeline
+ * @param streamIndex index of the stream
*/
private void discardPreallocatedBlocks(long containerID,
- PipelineID pipelineId) {
- // currentStreamIndex < streamEntries.size() signifies that, there are still
+ PipelineID pipelineId, int streamIndex) {
+ // streamIndex < streamEntries.size() signifies that, there are still
// pre allocated blocks available.
- if (currentStreamIndex < streamEntries.size()) {
+
+ // This will be called only to discard the next subsequent unused blocks
+ // in the sreamEntryList.
+ if (streamIndex < streamEntries.size()) {
ListIterator<BlockOutputStreamEntry> streamEntryIterator =
- streamEntries.listIterator(currentStreamIndex);
+ streamEntries.listIterator(streamIndex);
while (streamEntryIterator.hasNext()) {
BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
+ Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
if (((pipelineId != null && streamEntry.getPipeline().getId()
.equals(pipelineId)) || (containerID != -1
- && streamEntry.getBlockID().getContainerID() == containerID))
- && streamEntry.getCurrentPosition() == 0) {
+ && streamEntry.getBlockID().getContainerID() == containerID))) {
streamEntryIterator.remove();
}
}
@@ -396,7 +411,7 @@ public class KeyOutputStream extends OutputStream {
private void handleException(BlockOutputStreamEntry streamEntry,
int streamIndex, IOException exception) throws IOException {
Throwable t = checkForException(exception);
- boolean retryFailure = checkForRetryFailure(exception);
+ boolean retryFailure = checkForRetryFailure(t);
boolean closedContainerException = false;
if (!retryFailure) {
closedContainerException = checkIfContainerIsClosed(t);
@@ -411,16 +426,14 @@ public class KeyOutputStream extends OutputStream {
+ "uncommitted data length is {}", exception,
totalSuccessfulFlushedData, bufferedDataLen);
Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
- Preconditions.checkArgument(
- streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData
- == bufferedDataLen);
+ Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers);
if (!failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers);
}
- if (checkIfContainerIsClosed(t)) {
+ if (closedContainerException) {
excludeList.addConatinerId(ContainerID.valueof(containerId));
} else if (retryFailure || t instanceof TimeoutException) {
pipelineId = streamEntry.getPipeline().getId();
@@ -428,22 +441,15 @@ public class KeyOutputStream extends OutputStream {
}
// just clean up the current stream.
streamEntry.cleanup(retryFailure);
- if (bufferedDataLen > 0) {
- // If the data is still cached in the underlying stream, we need to
- // allocate new block and write this data in the datanode.
- currentStreamIndex += 1;
- handleRetry(exception, bufferedDataLen);
- }
- if (totalSuccessfulFlushedData == 0) {
- streamEntries.remove(streamIndex);
- currentStreamIndex -= 1;
- }
+ // discard all sunsequent blocks the containers and pipelines which
+ // are in the exclude list so that, the very next retry should never
+ // write data on the closed container/pipeline
if (closedContainerException) {
// discard subsequent pre allocated blocks from the streamEntries list
// from the closed container
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
- null);
+ null, streamIndex + 1);
} else {
// In case there is timeoutException or Watch for commit happening over
// majority or the client connection failure to the leader in the
@@ -451,7 +457,19 @@ public class KeyOutputStream extends OutputStream {
// Next block allocation will happen with excluding this specific pipeline
// This will ensure if 2 way commit happens , it cannot span over multiple
// blocks
- discardPreallocatedBlocks(-1, pipelineId);
+ discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1);
+ }
+ if (bufferedDataLen > 0) {
+ // If the data is still cached in the underlying stream, we need to
+ // allocate new block and write this data in the datanode.
+ currentStreamIndex += 1;
+ handleRetry(exception, bufferedDataLen);
+ // reset the retryCount after handling the exception
+ retryCount = 0;
+ }
+ if (totalSuccessfulFlushedData == 0) {
+ streamEntries.remove(streamIndex);
+ currentStreamIndex -= 1;
}
}
@@ -618,7 +636,9 @@ public class KeyOutputStream extends OutputStream {
if (keyArgs != null) {
// in test, this could be null
removeEmptyBlocks();
- keyArgs.setDataSize(getKeyLength());
+ long length = getKeyLength();
+ Preconditions.checkArgument(offset == length);
+ keyArgs.setDataSize(length);
keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not
// commit the key, as this is not an actual key, this is a just a
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 671f16c..54cdff0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -235,6 +236,8 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// now close the stream, It will update the ack length after watchForCommit
@@ -372,6 +375,8 @@ public class TestBlockOutputStreamWithFailures {
key.close();
Assert
.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -506,6 +511,8 @@ public class TestBlockOutputStreamWithFailures {
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException);
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
@@ -534,11 +541,698 @@ public class TestBlockOutputStreamWithFailures {
validateData(keyName, data1);
}
+ @Test
+ public void testFailureWithPrimeSizedData() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = 167;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ == pendingWriteChunkCount + 1);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ == pendingPutBlockCount);
+ Assert.assertEquals(writeChunkCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 1,
+ metrics.getTotalOpCount());
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(0,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
+
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 3,
+ metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ ContainerTestHelper.waitForContainerClose(key, cluster);
+ key.write(data1);
+
+ // As a part of handling the exception, 2 failed writeChunks will be
+ // rewritten plus 1 putBlocks for flush
+ // and one flush for partial chunk
+ key.flush();
+
+ Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+ .getIoException()) instanceof ContainerNotOpenException);
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+
+ // commitInfoMap will remain intact as there is no server failure
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 6,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 9,
+ metrics.getTotalOpCount());
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ validateData(keyName, dataString.concat(dataString).getBytes());
+ }
+
+ @Test
+ public void testExceptionDuringClose() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = 167;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ == pendingWriteChunkCount + 1);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ == pendingPutBlockCount);
+ Assert.assertEquals(writeChunkCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 1,
+ metrics.getTotalOpCount());
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(0,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
+
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 3,
+ metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ ContainerTestHelper.waitForContainerClose(key, cluster);
+ key.write(data1);
+
+ // commitInfoMap will remain intact as there is no server failure
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will hit exception
+ key.close();
+
+ Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+ .getIoException()) instanceof ContainerNotOpenException);
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 6,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 9,
+ metrics.getTotalOpCount());
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ validateData(keyName, dataString.concat(dataString).getBytes());
+ }
+
+ @Test
+ public void testWatchForCommitWithSingleNodeRatis() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
+ int dataLength = maxFlushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes atleast putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data more than flush Size(2 chunks), at this time
+ // buffer pool will have 4 buffers allocated worth of chunk size
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 8,
+ metrics.getTotalOpCount());
+
+ // flush is a sync call, all pending operations will complete
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ ContainerTestHelper.waitForContainerClose(key, cluster);
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
+ key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
+ key.flush();
+
+ Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+ .getIoException()) instanceof ContainerNotOpenException);
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+ // commitInfoMap will remain intact as there is no server failure
+ Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 14,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 8,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 22,
+ metrics.getTotalOpCount());
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ validateData(keyName, dataString.concat(dataString).getBytes());
+ }
+
+ @Test
+ public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
+ int dataLength = maxFlushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data more than flush Size(2 chunks), at this time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 8,
+ metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. No write will happen in the current block and
+ // data will be rewritten to the next block.
+
+ key.write(data1);
+
+ key.flush();
+
+ Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+ .getIoException()) instanceof AlreadyClosedException);
+ Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // in total, there are 14 full write chunks, 5 before the failure injection,
+ // 4 chunks after which we detect the failure and then 5 again on the next
+ // block
+ Assert.assertEquals(writeChunkCount + 14,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ // 3 flushes at flushSize boundaries before failure injection + 2
+ // flush failed + 3 more flushes for the next block
+ Assert.assertEquals(putBlockCount + 8,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 22,
+ metrics.getTotalOpCount());
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
+ validateData(keyName, dataString.concat(dataString).getBytes());
+ }
+
+ @Test
+ public void testDatanodeFailureWithPreAllocation() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.RATIS, 3 * blockSize,
+ ReplicationFactor.ONE);
+ int dataLength = maxFlushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data more than flush Size(2 chunks), at this time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 8,
+ metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. No write will happen and
+
+ key.write(data1);
+
+ key.flush();
+
+ Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+ .getIoException()) instanceof AlreadyClosedException);
+
+ // Make sure the retryCount is reset after the exception is handled
+ Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+ Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // in total, there are 14 full write chunks, 5 before the failure injection,
+ // 4 chunks after which we detect the failure and then 5 again on the next
+ // block
+ Assert.assertEquals(writeChunkCount + 14,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+
+ // 3 flushes at flushSize boundaries before failure injection + 2
+ // flush failed + 3 more flushes for the next block
+ Assert.assertEquals(putBlockCount + 8,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 22,
+ metrics.getTotalOpCount());
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
+ validateData(keyName, dataString.concat(dataString).getBytes());
+ }
+
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
+ return createKey(keyName, type, size, ReplicationFactor.THREE);
+ }
+
+ private OzoneOutputStream createKey(String keyName, ReplicationType type,
+ long size, ReplicationFactor factor) throws Exception {
return ContainerTestHelper
- .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+ .createKey(keyName, type, factor, size, objectStore, volumeName,
+ bucketName);
}
+
private void validateData(String keyName, byte[] data) throws Exception {
ContainerTestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 7f2d93d..0b618a0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -700,6 +700,15 @@ public final class ContainerTestHelper {
.createKey(keyName, size, type, factor, new HashMap<>());
}
+ public static OzoneOutputStream createKey(String keyName,
+ ReplicationType type,
+ org.apache.hadoop.hdds.client.ReplicationFactor factor, long size,
+ ObjectStore objectStore, String volumeName, String bucketName)
+ throws Exception {
+ return objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey(keyName, size, type, factor, new HashMap<>());
+ }
+
public static void validateData(String keyName, byte[] data,
ObjectStore objectStore, String volumeName, String bucketName)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org