You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/17 09:08:45 UTC
[hadoop] branch trunk updated: Revert "HDDS-1373. KeyOutputStream,
close after write request fails after retries,
runs into IllegalArgumentException..(#729)"
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 082f1e0 Revert "HDDS-1373. KeyOutputStream, close after write request fails after retries, runs into IllegalArgumentException..(#729)"
082f1e0 is described below
commit 082f1e04376f72487a1c90b3ddf8148b566c6e08
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Wed Apr 17 14:38:08 2019 +0530
Revert "HDDS-1373. KeyOutputStream, close after write request fails after retries, runs into IllegalArgumentException..(#729)"
This reverts commit df2ae27f3e40cb4cedda9c7bed1a464be60fd339.
---
.../scm/container/common/helpers/ExcludeList.java | 6 -
.../hadoop/ozone/client/io/KeyOutputStream.java | 198 ++++----
.../ozone/client/rpc/TestBlockOutputStream.java | 16 +-
.../rpc/TestBlockOutputStreamWithFailures.java | 27 +-
.../rpc/TestOzoneClientRetriesOnException.java | 46 +-
.../ozone/client/rpc/TestWatchForCommit.java | 511 ---------------------
.../ozone/container/ContainerTestHelper.java | 4 +-
7 files changed, 114 insertions(+), 694 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
index eb215d6..94a4b94 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
@@ -102,10 +102,4 @@ public class ExcludeList {
});
return excludeList;
}
-
- public void clear() {
- datanodes.clear();
- containerIds.clear();
- pipelineIds.clear();
- }
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index c1f195f..0d9529f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -295,66 +295,60 @@ public class KeyOutputStream extends OutputStream {
throws IOException {
int succeededAllocates = 0;
while (len > 0) {
- try {
- if (streamEntries.size() <= currentStreamIndex) {
- Preconditions.checkNotNull(omClient);
- // allocate a new block, if a exception happens, log an error and
- // throw exception to the caller directly, and the write fails.
- try {
- allocateNewBlock(currentStreamIndex);
- succeededAllocates += 1;
- } catch (IOException ioe) {
- LOG.error("Try to allocate more blocks for write failed, already "
- + "allocated " + succeededAllocates
- + " blocks for this write.");
- throw ioe;
- }
- }
- // in theory, this condition should never violate due the check above
- // still do a sanity check.
- Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
- BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-
- // length(len) will be in int range if the call is happening through
- // write API of blockOutputStream. Length can be in long range if it comes
- // via Exception path.
- int writeLen = Math.min((int) len, (int) current.getRemaining());
- long currentPos = current.getWrittenDataLength();
+ if (streamEntries.size() <= currentStreamIndex) {
+ Preconditions.checkNotNull(omClient);
+ // allocate a new block, if a exception happens, log an error and
+ // throw exception to the caller directly, and the write fails.
try {
- if (retry) {
- current.writeOnRetry(len);
- } else {
- current.write(b, off, writeLen);
- offset += writeLen;
- }
+ allocateNewBlock(currentStreamIndex);
+ succeededAllocates += 1;
} catch (IOException ioe) {
- // for the current iteration, totalDataWritten - currentPos gives the
- // amount of data already written to the buffer
-
- // In the retryPath, the total data to be written will always be equal
- // to or less than the max length of the buffer allocated.
- // The len specified here is the combined sum of the data length of
- // the buffers
- Preconditions.checkState(!retry || len <= streamBufferMaxSize);
- int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
- writeLen = retry ? (int) len : dataWritten;
- // In retry path, the data written is already accounted in offset.
- if (!retry) {
- offset += writeLen;
- }
- LOG.debug("writeLen {}, total len {}", writeLen, len);
- handleException(current, currentStreamIndex, ioe);
+ LOG.error("Try to allocate more blocks for write failed, already "
+ + "allocated " + succeededAllocates + " blocks for this write.");
+ throw ioe;
}
- if (current.getRemaining() <= 0) {
- // since the current block is already written close the stream.
- handleFlushOrClose(StreamAction.FULL);
+ }
+ // in theory, this condition should never violate due the check above
+ // still do a sanity check.
+ Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+ BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+
+ // length(len) will be in int range if the call is happening through
+ // write API of blockOutputStream. Length can be in long range if it comes
+ // via Exception path.
+ int writeLen = Math.min((int)len, (int) current.getRemaining());
+ long currentPos = current.getWrittenDataLength();
+ try {
+ if (retry) {
+ current.writeOnRetry(len);
+ } else {
+ current.write(b, off, writeLen);
+ offset += writeLen;
+ }
+ } catch (IOException ioe) {
+ // for the current iteration, totalDataWritten - currentPos gives the
+ // amount of data already written to the buffer
+
+ // In the retryPath, the total data to be written will always be equal
+ // to or less than the max length of the buffer allocated.
+ // The len specified here is the combined sum of the data length of
+ // the buffers
+ Preconditions.checkState(!retry || len <= streamBufferMaxSize);
+ int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+ writeLen = retry ? (int) len : dataWritten;
+ // In retry path, the data written is already accounted in offset.
+ if (!retry) {
+ offset += writeLen;
}
- len -= writeLen;
- off += writeLen;
- } catch (Exception e) {
- markStreamClosed();
- throw e;
+ LOG.debug("writeLen {}, total len {}", writeLen, len);
+ handleException(current, currentStreamIndex, ioe);
+ }
+ if (current.getRemaining() <= 0) {
+ // since the current block is already written close the stream.
+ handleFlushOrClose(StreamAction.FULL);
}
+ len -= writeLen;
+ off += writeLen;
}
}
@@ -371,7 +365,7 @@ public class KeyOutputStream extends OutputStream {
// pre allocated blocks available.
// This will be called only to discard the next subsequent unused blocks
- // in the streamEntryList.
+ // in the sreamEntryList.
if (streamIndex < streamEntries.size()) {
ListIterator<BlockOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(streamIndex);
@@ -404,20 +398,6 @@ public class KeyOutputStream extends OutputStream {
}
}
}
-
- private void cleanup() {
- if (excludeList != null) {
- excludeList.clear();
- excludeList = null;
- }
- if (bufferPool != null) {
- bufferPool.clearBufferPool();
- }
-
- if (streamEntries != null) {
- streamEntries.clear();
- }
- }
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
@@ -438,7 +418,8 @@ public class KeyOutputStream extends OutputStream {
closedContainerException = checkIfContainerIsClosed(t);
}
PipelineID pipelineId = null;
- long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
+ long totalSuccessfulFlushedData =
+ streamEntry.getTotalAckDataLength();
//set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData();
@@ -469,8 +450,8 @@ public class KeyOutputStream extends OutputStream {
if (closedContainerException) {
// discard subsequent pre allocated blocks from the streamEntries list
// from the closed container
- discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null,
- streamIndex + 1);
+ discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
+ null, streamIndex + 1);
} else {
// In case there is timeoutException or Watch for commit happening over
// majority or the client connection failure to the leader in the
@@ -494,11 +475,6 @@ public class KeyOutputStream extends OutputStream {
}
}
- private void markStreamClosed() {
- cleanup();
- closed = true;
- }
-
private void handleRetry(IOException exception, long len) throws IOException {
RetryPolicy.RetryAction action;
try {
@@ -610,46 +586,40 @@ public class KeyOutputStream extends OutputStream {
return;
}
while (true) {
- try {
- int size = streamEntries.size();
- int streamIndex =
- currentStreamIndex >= size ? size - 1 : currentStreamIndex;
- BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
- if (entry != null) {
- try {
- Collection<DatanodeDetails> failedServers =
- entry.getFailedServers();
- // failed servers can be null in case there is no data written in the
- // stream
- if (failedServers != null && !failedServers.isEmpty()) {
- excludeList.addDatanodes(failedServers);
- }
- switch (op) {
- case CLOSE:
+ int size = streamEntries.size();
+ int streamIndex =
+ currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+ BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
+ if (entry != null) {
+ try {
+ Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+ // failed servers can be null in case there is no data written in the
+ // stream
+ if (failedServers != null && !failedServers.isEmpty()) {
+ excludeList.addDatanodes(failedServers);
+ }
+ switch (op) {
+ case CLOSE:
+ entry.close();
+ break;
+ case FULL:
+ if (entry.getRemaining() == 0) {
entry.close();
- break;
- case FULL:
- if (entry.getRemaining() == 0) {
- entry.close();
- currentStreamIndex++;
- }
- break;
- case FLUSH:
- entry.flush();
- break;
- default:
- throw new IOException("Invalid Operation");
+ currentStreamIndex++;
}
- } catch (IOException ioe) {
- handleException(entry, streamIndex, ioe);
- continue;
+ break;
+ case FLUSH:
+ entry.flush();
+ break;
+ default:
+ throw new IOException("Invalid Operation");
}
+ } catch (IOException ioe) {
+ handleException(entry, streamIndex, ioe);
+ continue;
}
- break;
- } catch (Exception e) {
- markStreamClosed();
- throw e;
}
+ break;
}
}
@@ -688,7 +658,7 @@ public class KeyOutputStream extends OutputStream {
} catch (IOException ioe) {
throw ioe;
} finally {
- cleanup();
+ bufferPool.clearBufferPool();
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 399b977..32bef12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -189,7 +189,6 @@ public class TestBlockOutputStream {
// flush ensures watchForCommit updates the total length acknowledged
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
@@ -209,7 +208,7 @@ public class TestBlockOutputStream {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -264,7 +263,6 @@ public class TestBlockOutputStream {
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
- Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
// flush is a sync call, all pending operations will complete
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -304,7 +302,7 @@ public class TestBlockOutputStream {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -399,7 +397,6 @@ public class TestBlockOutputStream {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -457,7 +454,6 @@ public class TestBlockOutputStream {
blockOutputStream.getCommitIndex2flushedDataMap().size());
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
- Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
key.close();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -475,7 +471,7 @@ public class TestBlockOutputStream {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -540,7 +536,6 @@ public class TestBlockOutputStream {
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
- Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -575,7 +570,7 @@ public class TestBlockOutputStream {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -643,7 +638,6 @@ public class TestBlockOutputStream {
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
- Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -679,7 +673,7 @@ public class TestBlockOutputStream {
metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 89a2af9..f228dad 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -234,7 +234,6 @@ public class TestBlockOutputStreamWithFailures {
// and one flush for partial chunk
key.flush();
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
@@ -250,7 +249,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -373,7 +372,6 @@ public class TestBlockOutputStreamWithFailures {
key.flush();
Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
@@ -384,7 +382,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -517,14 +515,13 @@ public class TestBlockOutputStreamWithFailures {
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit
-
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
key.close();
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -541,7 +538,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -640,7 +637,6 @@ public class TestBlockOutputStreamWithFailures {
// and one flush for partial chunk
key.flush();
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
// Make sure the retryCount is reset after the exception is handled
@@ -656,6 +652,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -666,7 +663,7 @@ public class TestBlockOutputStreamWithFailures {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 9,
metrics.getTotalOpCount());
- Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
@@ -777,6 +774,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -787,7 +785,7 @@ public class TestBlockOutputStreamWithFailures {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 9,
metrics.getTotalOpCount());
- Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
@@ -913,7 +911,6 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
// make sure the bufferPool is empty
@@ -922,7 +919,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -1049,7 +1046,6 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
@@ -1058,7 +1054,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
@@ -1075,7 +1071,6 @@ public class TestBlockOutputStreamWithFailures {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
@@ -1203,7 +1198,7 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5cb6dbc..381cf14 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -68,7 +66,6 @@ public class TestOzoneClientRetriesOnException {
private String volumeName;
private String bucketName;
private String keyString;
- private XceiverClientManager xceiverClientManager;
/**
* Create a MiniDFSCluster for testing.
@@ -87,6 +84,8 @@ public class TestOzoneClientRetriesOnException {
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+ conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
+ conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s");
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
@@ -101,7 +100,6 @@ public class TestOzoneClientRetriesOnException {
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
- xceiverClientManager = new XceiverClientManager(conf);
keyString = UUID.randomUUID().toString();
volumeName = "testblockoutputstreamwithretries";
bucketName = volumeName;
@@ -154,9 +152,8 @@ public class TestOzoneClientRetriesOnException {
.getIoException()) instanceof GroupMismatchException);
Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
.contains(pipeline.getId()));
- Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
key.close();
- Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
validateData(keyName, data1);
}
@@ -174,8 +171,13 @@ public class TestOzoneClientRetriesOnException {
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
+ key.write(data1);
+
+ OutputStream stream = entries.get(0).getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+ List<PipelineID> pipelineList = new ArrayList<>();
long containerID;
- List<Long> containerList = new ArrayList<>();
for (BlockOutputStreamEntry entry : entries) {
containerID = entry.getBlockID().getContainerID();
ContainerInfo container =
@@ -184,40 +186,18 @@ public class TestOzoneClientRetriesOnException {
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
- XceiverClientSpi xceiverClient =
- xceiverClientManager.acquireClient(pipeline);
- if (!containerList.contains(containerID)) {
- xceiverClient.sendCommand(ContainerTestHelper
- .getCreateContainerRequest(containerID, pipeline));
- }
- xceiverClientManager.releaseClient(xceiverClient, false);
+ pipelineList.add(pipeline.getId());
}
- key.write(data1);
- OutputStream stream = entries.get(0).getOutputStream();
- Assert.assertTrue(stream instanceof BlockOutputStream);
- BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
- ContainerTestHelper.waitForContainerClose(key, cluster);
+ ContainerTestHelper.waitForPipelineClose(key, cluster, false);
try {
key.write(data1);
- Assert.fail("Expected exception not thrown");
} catch (IOException ioe) {
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
- .getIoException()) instanceof ContainerNotOpenException);
+ .getIoException()) instanceof GroupMismatchException);
Assert.assertTrue(ioe.getMessage().contains(
"Retry request failed. retries get failed due to exceeded maximum "
+ "allowed retries number: 3"));
}
- try {
- key.flush();
- Assert.fail("Expected exception not thrown");
- } catch (IOException ioe) {
- Assert.assertTrue(ioe.getMessage().contains("Stream is closed"));
- }
- try {
- key.close();
- } catch (IOException ioe) {
- Assert.fail("Expected should not be thrown");
- }
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
deleted file mode 100644
index 41e9da6..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc;
-
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.*;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.io.KeyOutputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-
-/**
- * This class verifies the watchForCommit Handling by client.
- */
-public class TestWatchForCommit {
-
- private MiniOzoneCluster cluster;
- private OzoneConfiguration conf;
- private OzoneClient client;
- private ObjectStore objectStore;
- private String volumeName;
- private String bucketName;
- private String keyString;
- private int chunkSize;
- private int flushSize;
- private int maxFlushSize;
- private int blockSize;
- private StorageContainerLocationProtocolClientSideTranslatorPB
- storageContainerLocationClient;
- private static String containerOwner = "OZONE";
-
- /**
- * Create a MiniDFSCluster for testing.
- * <p>
- * Ozone is made active by setting OZONE_ENABLED = true
- *
- * @throws IOException
- */
- private void startCluster(OzoneConfiguration conf) throws Exception {
- chunkSize = 100;
- flushSize = 2 * chunkSize;
- maxFlushSize = 2 * flushSize;
- blockSize = 2 * maxFlushSize;
-
- conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
- conf.setTimeDuration(
- OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
- 1, TimeUnit.SECONDS);
-
- conf.setQuietMode(false);
- cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(7)
- .setBlockSize(blockSize)
- .setChunkSize(chunkSize)
- .setStreamBufferFlushSize(flushSize)
- .setStreamBufferMaxSize(maxFlushSize)
- .setStreamBufferSizeUnit(StorageUnit.BYTES)
- .build();
- cluster.waitForClusterToBeReady();
- //the easiest way to create an open container is creating a key
- client = OzoneClientFactory.getClient(conf);
- objectStore = client.getObjectStore();
- keyString = UUID.randomUUID().toString();
- volumeName = "watchforcommithandlingtest";
- bucketName = volumeName;
- objectStore.createVolume(volumeName);
- objectStore.getVolume(volumeName).createBucket(bucketName);
- storageContainerLocationClient = cluster
- .getStorageContainerLocationClient();
- }
-
-
- /**
- * Shutdown MiniDFSCluster.
- */
- private void shutdown() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- private String getKeyName() {
- return UUID.randomUUID().toString();
- }
-
- @Test
- public void testWatchForCommitWithKeyWrite() throws Exception {
- // in this case, watch request should fail with RaftRetryFailureException
- // and will be captured in keyOutputStream and the failover will happen
- // to a different block
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10,
- TimeUnit.SECONDS);
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
- startCluster(conf);
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount = metrics.getContainerOpCountMetrics(
- ContainerProtos.Type.WriteChunk);
- long putBlockCount = metrics.getContainerOpCountMetrics(
- ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
- ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount = metrics.getContainerOpsMetrics(
- ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
- String keyName = getKeyName();
- OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
- int dataLength = maxFlushSize + 50;
- // write data more than 1 chunk
- byte[] data1 =
- ContainerTestHelper.getFixedLengthString(keyString, dataLength)
- .getBytes(UTF_8);
- key.write(data1);
- // since its hitting the full bufferCondition, it will call watchForCommit
- // and completes atleast putBlock for first flushSize worth of data
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- <= pendingWriteChunkCount + 2);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- <= pendingPutBlockCount + 1);
- Assert.assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 6,
- metrics.getTotalOpCount());
- Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
- KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
-
- Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
- OutputStream stream = keyOutputStream.getStreamEntries().get(0)
- .getOutputStream();
- Assert.assertTrue(stream instanceof BlockOutputStream);
- BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
-
- // we have just written data more than flush Size(2 chunks), at this time
- // buffer pool will have 3 buffers allocated worth of chunk size
-
- Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
- // writtenDataLength as well flushedDataLength will be updated here
- Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- Assert.assertEquals(maxFlushSize,
- blockOutputStream.getTotalDataFlushedLength());
-
- // since data equals to maxBufferSize is written, this will be a blocking
- // call and hence will wait for atleast flushSize worth of data to get
- // acked by all servers right here
- Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
-
- // watchForCommit will clean up atleast one entry from the map where each
- // entry corresponds to flushSize worth of data
- Assert.assertTrue(
- blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
-
- // Now do a flush. This will flush the data and update the flush length and
- // the map.
- key.flush();
-
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 8,
- metrics.getTotalOpCount());
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
- Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
- Assert.assertEquals(dataLength,
- blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- Assert.assertTrue(
- blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
-
- XceiverClientRatis raftClient =
- (XceiverClientRatis) blockOutputStream.getXceiverClient();
- Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
- Pipeline pipeline = raftClient.getPipeline();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
- // again write data with more than max buffer limit. This will call
- // watchForCommit again. Since the commit will happen 2 way, the
- // commitInfoMap will get updated for servers which are alive
-
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
- key.write(data1);
-
- // As a part of handling the exception, 4 failed writeChunks will be
- // rewritten plus one partial chunk plus two putBlocks for flushSize
- // and one flush for partial chunk
- key.flush();
- Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
- .getIoException()) instanceof RaftRetryFailureException);
- // Make sure the retryCount is reset after the exception is handled
- Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
- // now close the stream, It will update the ack length after watchForCommit
- key.close();
- Assert
- .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- Assert
- .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 14,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 8,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 22,
- metrics.getTotalOpCount());
- Assert
- .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- // make sure the bufferPool is empty
- Assert
- .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
- validateData(keyName, data1);
- shutdown();
- }
-
- @Test
- public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
- TimeUnit.SECONDS);
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
- startCluster(conf);
- XceiverClientManager clientManager = new XceiverClientManager(conf);
- ContainerWithPipeline container1 =
- storageContainerLocationClient.allocateContainer(
- HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
- containerOwner);
- XceiverClientSpi client = clientManager
- .acquireClient(container1.getPipeline());
- Assert.assertEquals(1, client.getRefcount());
- Assert.assertEquals(container1.getPipeline(),
- client.getPipeline());
- Pipeline pipeline = client.getPipeline();
- XceiverClientReply reply =
- client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
- container1.getContainerInfo().getContainerID(),
- client.getPipeline()));
- reply.getResponse().get();
- long index = reply.getLogIndex();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
- try {
- // just watch for a lo index which in not updated in the commitInfo Map
- client.watchForCommit(index + 1, 3000);
- Assert.fail("expected exception not thrown");
- } catch (Exception e) {
- Assert.assertTrue(
- HddsClientUtils.checkForException(e) instanceof TimeoutException);
- }
- // After releasing the client, this connection should be closed
- // and any container operations should fail
- clientManager.releaseClient(client, false);
- shutdown();
- }
-
- @Test
- public void testWatchForCommitForRetryfailure() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
- TimeUnit.SECONDS);
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
- startCluster(conf);
- XceiverClientManager clientManager = new XceiverClientManager(conf);
- ContainerWithPipeline container1 =
- storageContainerLocationClient.allocateContainer(
- HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
- containerOwner);
- XceiverClientSpi client = clientManager
- .acquireClient(container1.getPipeline());
- Assert.assertEquals(1, client.getRefcount());
- Assert.assertEquals(container1.getPipeline(),
- client.getPipeline());
- Pipeline pipeline = client.getPipeline();
- XceiverClientReply reply =
- client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
- container1.getContainerInfo().getContainerID(),
- client.getPipeline()));
- reply.getResponse().get();
- long index = reply.getLogIndex();
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
- // again write data with more than max buffer limit. This wi
- try {
- // just watch for a lo index which in not updated in the commitInfo Map
- client.watchForCommit(index + 1, 20000);
- Assert.fail("expected exception not thrown");
- } catch (Exception e) {
- Assert.assertTrue(HddsClientUtils
- .checkForException(e) instanceof RaftRetryFailureException);
- }
- clientManager.releaseClient(client, false);
- shutdown();
- }
-
- @Test
- public void test2WayCommitForRetryfailure() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
- TimeUnit.SECONDS);
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
- startCluster(conf);
- GenericTestUtils.LogCapturer logCapturer =
- GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
- XceiverClientManager clientManager = new XceiverClientManager(conf);
-
- ContainerWithPipeline container1 =
- storageContainerLocationClient.allocateContainer(
- HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
- containerOwner);
- XceiverClientSpi client = clientManager
- .acquireClient(container1.getPipeline());
- Assert.assertEquals(1, client.getRefcount());
- Assert.assertEquals(container1.getPipeline(),
- client.getPipeline());
- Pipeline pipeline = client.getPipeline();
- XceiverClientRatis ratisClient = (XceiverClientRatis) client;
- XceiverClientReply reply =
- client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
- container1.getContainerInfo().getContainerID(),
- client.getPipeline()));
- reply.getResponse().get();
- Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
- reply = client.sendCommandAsync(ContainerTestHelper
- .getCloseContainer(pipeline,
- container1.getContainerInfo().getContainerID()));
- reply.getResponse().get();
- client.watchForCommit(reply.getLogIndex(), 20000);
-
- // commitInfo Map will be reduced to 2 here
- Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
- clientManager.releaseClient(client, false);
- Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
- Assert.assertTrue(
- logCapturer.getOutput().contains("RaftRetryFailureException"));
- Assert
- .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
- logCapturer.stopCapturing();
- shutdown();
- }
-
- @Test
- public void test2WayCommitForTimeoutException() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
- TimeUnit.SECONDS);
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
- startCluster(conf);
- GenericTestUtils.LogCapturer logCapturer =
- GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
- XceiverClientManager clientManager = new XceiverClientManager(conf);
-
- ContainerWithPipeline container1 =
- storageContainerLocationClient.allocateContainer(
- HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
- containerOwner);
- XceiverClientSpi client = clientManager
- .acquireClient(container1.getPipeline());
- Assert.assertEquals(1, client.getRefcount());
- Assert.assertEquals(container1.getPipeline(),
- client.getPipeline());
- Pipeline pipeline = client.getPipeline();
- XceiverClientRatis ratisClient = (XceiverClientRatis) client;
- XceiverClientReply reply =
- client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
- container1.getContainerInfo().getContainerID(),
- client.getPipeline()));
- reply.getResponse().get();
- Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
- cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
- reply = client.sendCommandAsync(ContainerTestHelper
- .getCloseContainer(pipeline,
- container1.getContainerInfo().getContainerID()));
- reply.getResponse().get();
- client.watchForCommit(reply.getLogIndex(), 3000);
-
- // commitInfo Map will be reduced to 2 here
- Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
- clientManager.releaseClient(client, false);
- Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
- Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException"));
- Assert
- .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
- logCapturer.stopCapturing();
- shutdown();
- }
-
- @Test
- public void testWatchForCommitForGroupMismatchException() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
- TimeUnit.SECONDS);
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
-
- // mark the node stale early so that pipeline gets destroyed quickly
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
- startCluster(conf);
- GenericTestUtils.LogCapturer logCapturer =
- GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
- XceiverClientManager clientManager = new XceiverClientManager(conf);
-
- ContainerWithPipeline container1 =
- storageContainerLocationClient.allocateContainer(
- HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
- containerOwner);
- XceiverClientSpi client = clientManager
- .acquireClient(container1.getPipeline());
- Assert.assertEquals(1, client.getRefcount());
- Assert.assertEquals(container1.getPipeline(),
- client.getPipeline());
- Pipeline pipeline = client.getPipeline();
- XceiverClientRatis ratisClient = (XceiverClientRatis) client;
- long containerId = container1.getContainerInfo().getContainerID();
- XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper
- .getCreateContainerRequest(containerId, client.getPipeline()));
- reply.getResponse().get();
- Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
- List<Pipeline> pipelineList = new ArrayList<>();
- pipelineList.add(pipeline);
- ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
- try {
- // just watch for a lo index which in not updated in the commitInfo Map
- //client.watchForCommit(reply.getLogIndex() + 1, 20000);
- reply = client.sendCommandAsync(ContainerTestHelper
- .getCreateContainerRequest(containerId, client.getPipeline()));
- reply.getResponse().get();
- Assert.fail("Expected exception not thrown");
- } catch(Exception e) {
- Assert.assertTrue(HddsClientUtils
- .checkForException(e) instanceof GroupMismatchException);
- }
- clientManager.releaseClient(client, false);
- shutdown();
- }
-
- private OzoneOutputStream createKey(String keyName, ReplicationType type,
- long size) throws Exception {
- return ContainerTestHelper
- .createKey(keyName, type, size, objectStore, volumeName, bucketName);
- }
-
- private void validateData(String keyName, byte[] data) throws Exception {
- ContainerTestHelper
- .validateData(keyName, data, objectStore, volumeName, bucketName);
- }
-}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index a1fd17c..93807b4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -727,9 +727,7 @@ public final class ContainerTestHelper {
keyOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
- long id = info.getContainerID();
- if (!containerIdList.contains(id))
- containerIdList.add(id);
+ containerIdList.add(info.getContainerID());
}
Assert.assertTrue(!containerIdList.isEmpty());
waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org