You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/25 10:20:36 UTC
[hadoop] branch ozone-0.4 updated: HDDS-1317. KeyOutputStream#write
throws ArrayIndexOutOfBoundsException when running RandomWrite MR examples.
Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/ozone-0.4 by this push:
new eed623a HDDS-1317. KeyOutputStream#write throws ArrayIndexOutOfBoundsException when running RandomWrite MR examples. Contributed by Shashikant Banerjee.
eed623a is described below
commit eed623ad618d06784858f793d72ecc01126753ef
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Mar 25 15:41:20 2019 +0530
HDDS-1317. KeyOutputStream#write throws ArrayIndexOutOfBoundsException when running RandomWrite MR examples. Contributed by Shashikant Banerjee.
---
.../hadoop/hdds/scm/XceiverClientMetrics.java | 20 +
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 90 +--
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 199 ++++--
.../apache/hadoop/hdds/scm/storage/BufferPool.java | 16 +-
.../org/apache/hadoop/hdds/client/BlockID.java | 4 +-
.../ozone/client/io/BlockOutputStreamEntry.java | 6 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 59 +-
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 7 +
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 12 +-
.../ozone/client/rpc/TestBlockOutputStream.java | 690 +++++++++++++++++++++
.../rpc/TestBlockOutputStreamWithFailures.java | 546 ++++++++++++++++
.../rpc/TestCloseContainerHandlingByClient.java | 11 +-
.../ozone/container/ContainerTestHelper.java | 17 +
.../commandhandler/TestBlockDeletion.java | 2 +-
14 files changed, 1543 insertions(+), 136 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index a430400..6c40921 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.metrics2.MetricsSystem;
@@ -37,7 +38,9 @@ public class XceiverClientMetrics {
.getSimpleName();
private @Metric MutableCounterLong pendingOps;
+ private @Metric MutableCounterLong totalOps;
private MutableCounterLong[] pendingOpsArray;
+ private MutableCounterLong[] opsArray;
private MutableRate[] containerOpsLatency;
private MetricsRegistry registry;
@@ -46,12 +49,17 @@ public class XceiverClientMetrics {
this.registry = new MetricsRegistry(SOURCE_NAME);
this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
+ this.opsArray = new MutableCounterLong[numEnumEntries];
this.containerOpsLatency = new MutableRate[numEnumEntries];
for (int i = 0; i < numEnumEntries; i++) {
pendingOpsArray[i] = registry.newCounter(
"numPending" + ContainerProtos.Type.forNumber(i + 1),
"number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
+ opsArray[i] = registry
+ .newCounter("opCount" + ContainerProtos.Type.forNumber(i + 1),
+ "number of" + ContainerProtos.Type.forNumber(i + 1) + " ops",
+ (long) 0);
containerOpsLatency[i] = registry.newRate(
ContainerProtos.Type.forNumber(i + 1) + "Latency",
@@ -68,6 +76,8 @@ public class XceiverClientMetrics {
public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
pendingOps.incr();
+ totalOps.incr();
+ opsArray[type.ordinal()].incr();
pendingOpsArray[type.ordinal()].incr();
}
@@ -85,6 +95,16 @@ public class XceiverClientMetrics {
return pendingOpsArray[type.ordinal()].value();
}
+ @VisibleForTesting
+ public long getTotalOpCount() {
+ return totalOps.value();
+ }
+
+ @VisibleForTesting
+ public long getContainerOpCountMetrics(ContainerProtos.Type type) {
+ return opsArray[type.ordinal()].value();
+ }
+
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 65241bf..a2e65e2 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
+import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftRetryFailureException;
@@ -101,6 +103,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// create a separate RaftClient for watchForCommit API
private RaftClient watchClient;
+ private XceiverClientMetrics metrics;
+
/**
* Constructs a client.
*/
@@ -116,6 +120,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
watchClient = null;
this.tlsConfig = tlsConfig;
this.clientRequestTimeout = timeout;
+ metrics = XceiverClientManager.getXceiverClientMetrics();
}
private void updateCommitInfosMap(
@@ -199,6 +204,12 @@ public final class XceiverClientRatis extends XceiverClientSpi {
return Objects.requireNonNull(client.get(), "client is null");
}
+
+ @VisibleForTesting
+ public ConcurrentHashMap<UUID, Long> getCommitInfoMap() {
+ return commitInfoMap;
+ }
+
private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) {
try (Scope scope = GlobalTracer.get()
@@ -301,47 +312,52 @@ public final class XceiverClientRatis extends XceiverClientSpi {
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) {
XceiverClientReply asyncReply = new XceiverClientReply(null);
+ long requestTime = Time.monotonicNowNanos();
CompletableFuture<RaftClientReply> raftClientReply =
sendRequestAsync(request);
+ metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
- raftClientReply.whenComplete((reply, e) -> LOG.debug(
- "received reply {} for request: cmdType={} containerID={}"
- + " pipelineID={} traceID={} exception: {}", reply,
- request.getCmdType(), request.getContainerID(),
- request.getPipelineID(), request.getTraceID(), e))
- .thenApply(reply -> {
- try {
- // we need to handle RaftRetryFailure Exception
- RaftRetryFailureException raftRetryFailureException =
- reply.getRetryFailureException();
- if (raftRetryFailureException != null) {
- // in case of raft retry failure, the raft client is
- // not able to connect to the leader hence the pipeline
- // can not be used but this instance of RaftClient will close
- // and refreshed again. In case the client cannot connect to
- // leader, getClient call will fail.
+ raftClientReply.whenComplete((reply, e) -> {
+ LOG.debug("received reply {} for request: cmdType={} containerID={}"
+ + " pipelineID={} traceID={} exception: {}", reply,
+ request.getCmdType(), request.getContainerID(),
+ request.getPipelineID(), request.getTraceID(), e);
+ metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+ metrics.addContainerOpsLatency(request.getCmdType(),
+ Time.monotonicNowNanos() - requestTime);
+ }).thenApply(reply -> {
+ try {
+ // we need to handle RaftRetryFailure Exception
+ RaftRetryFailureException raftRetryFailureException =
+ reply.getRetryFailureException();
+ if (raftRetryFailureException != null) {
+ // in case of raft retry failure, the raft client is
+ // not able to connect to the leader hence the pipeline
+ // can not be used but this instance of RaftClient will close
+ // and refreshed again. In case the client cannot connect to
+ // leader, getClient call will fail.
- // No need to set the failed Server ID here. Ozone client
- // will directly exclude this pipeline in next allocate block
- // to SCM as in this case, it is the raft client which is not
- // able to connect to leader in the pipeline, though the
- // pipeline can still be functional.
- throw new CompletionException(raftRetryFailureException);
- }
- ContainerCommandResponseProto response =
- ContainerCommandResponseProto
- .parseFrom(reply.getMessage().getContent());
- UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
- if (response.getResult() == ContainerProtos.Result.SUCCESS) {
- updateCommitInfosMap(reply.getCommitInfos());
- }
- asyncReply.setLogIndex(reply.getLogIndex());
- addDatanodetoReply(serverId, asyncReply);
- return response;
- } catch (InvalidProtocolBufferException e) {
- throw new CompletionException(e);
- }
- });
+ // No need to set the failed Server ID here. Ozone client
+ // will directly exclude this pipeline in next allocate block
+ // to SCM as in this case, it is the raft client which is not
+ // able to connect to leader in the pipeline, though the
+ // pipeline can still be functional.
+ throw new CompletionException(raftRetryFailureException);
+ }
+ ContainerCommandResponseProto response =
+ ContainerCommandResponseProto
+ .parseFrom(reply.getMessage().getContent());
+ UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
+ if (response.getResult() == ContainerProtos.Result.SUCCESS) {
+ updateCommitInfosMap(reply.getCommitInfos());
+ }
+ asyncReply.setLogIndex(reply.getLogIndex());
+ addDatanodetoReply(serverId, asyncReply);
+ return response;
+ } catch (InvalidProtocolBufferException e) {
+ throw new CompletionException(e);
+ }
+ });
asyncReply.setResponse(containerCommandResponse);
return asyncReply;
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 13913ee..cfbb6ae 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.storage;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -45,7 +46,15 @@ import java.util.Collections;
import java.util.UUID;
import java.util.List;
import java.util.ArrayList;
-import java.util.concurrent.*;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
@@ -104,15 +113,25 @@ public class BlockOutputStream extends OutputStream {
// by all servers
private long totalAckDataLength;
+ // List containing buffers for which the putBlock call will
+ // update the length in the datanodes. This list will just maintain
+ // references to the buffers in the BufferPool which will be cleared
+ // when the watchForCommit acknowledges a putBlock logIndex has been
+ // committed on all datanodes. This list will be a place holder for buffers
+ // which got written between successive putBlock calls.
+ private List<ByteBuffer> bufferList;
+
// future Map to hold up all putBlock futures
private ConcurrentHashMap<Long,
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
futureMap;
- // map containing mapping for putBlock logIndex to to flushedDataLength Map.
// The map should maintain the keys (logIndexes) in order so that while
// removing we always end up updating incremented data flushed length.
- private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap;
+ // Also, corresponding to the logIndex, the corresponding list of buffers will
+ // be released from the buffer pool.
+ private ConcurrentSkipListMap<Long, List<ByteBuffer>>
+ commitIndex2flushedDataMap;
private List<DatanodeDetails> failedServers;
@@ -167,13 +186,15 @@ public class BlockOutputStream extends OutputStream {
totalDataFlushedLength = 0;
writtenDataLength = 0;
failedServers = Collections.emptyList();
+ bufferList = null;
}
+
public BlockID getBlockID() {
return blockID;
}
- public long getTotalSuccessfulFlushedData() {
+ public long getTotalAckDataLength() {
return totalAckDataLength;
}
@@ -185,6 +206,31 @@ public class BlockOutputStream extends OutputStream {
return failedServers;
}
+ @VisibleForTesting
+ public XceiverClientSpi getXceiverClient() {
+ return xceiverClient;
+ }
+
+ @VisibleForTesting
+ public long getTotalDataFlushedLength() {
+ return totalDataFlushedLength;
+ }
+
+ @VisibleForTesting
+ public BufferPool getBufferPool() {
+ return bufferPool;
+ }
+
+ @VisibleForTesting
+ public IOException getIoException() {
+ return ioException;
+ }
+
+ @VisibleForTesting
+ public Map<Long, List<ByteBuffer>> getCommitIndex2flushedDataMap() {
+ return commitIndex2flushedDataMap;
+ }
+
@Override
public void write(int b) throws IOException {
checkOpen();
@@ -206,9 +252,9 @@ public class BlockOutputStream extends OutputStream {
if (len == 0) {
return;
}
+
while (len > 0) {
int writeLen;
-
// Allocate a buffer if needed. The buffer will be allocated only
// once as needed and will be reused again for multiple blockOutputStream
// entries.
@@ -224,8 +270,8 @@ public class BlockOutputStream extends OutputStream {
len -= writeLen;
writtenDataLength += writeLen;
if (shouldFlush()) {
- totalDataFlushedLength += streamBufferFlushSize;
- handlePartialFlush();
+ updateFlushLength();
+ executePutBlock();
}
// Data in the bufferPool can not exceed streamBufferMaxSize
if (isBufferPoolFull()) {
@@ -235,7 +281,11 @@ public class BlockOutputStream extends OutputStream {
}
private boolean shouldFlush() {
- return writtenDataLength % streamBufferFlushSize == 0;
+ return bufferPool.computeBufferData() % streamBufferFlushSize == 0;
+ }
+
+ private void updateFlushLength() {
+ totalDataFlushedLength += writtenDataLength - totalDataFlushedLength;
}
private boolean isBufferPoolFull() {
@@ -264,17 +314,17 @@ public class BlockOutputStream extends OutputStream {
len -= writeLen;
count++;
writtenDataLength += writeLen;
- if (shouldFlush()) {
+ // we should not call isBufferFull/shouldFlush here.
+ // The buffer might already be full as whole data is already cached in
+ // the buffer. We should just validate
+ // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
+ // call for handling full buffer/flush buffer condition.
+ if (writtenDataLength % streamBufferFlushSize == 0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
- totalDataFlushedLength += streamBufferFlushSize;
- handlePartialFlush();
+ updateFlushLength();
+ executePutBlock();
}
-
- // we should not call isBufferFull here. The buffer might already be full
- // as whole data is already cached in the buffer. We should just validate
- // if we wrote data of size streamBufferMaxSize to call for handling
- // full buffer condition.
if (writtenDataLength == streamBufferMaxSize) {
handleFullBuffer();
}
@@ -289,25 +339,22 @@ public class BlockOutputStream extends OutputStream {
Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
for (long index : indexes) {
Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
- long length = commitIndex2flushedDataMap.remove(index);
-
- // totalAckDataLength replicated yet should always be less than equal to
- // the current length being returned from commitIndex2flushedDataMap.
- // The below precondition would ensure commitIndex2flushedDataMap entries
- // are removed in order of the insertion to the map.
- Preconditions.checkArgument(totalAckDataLength < length);
- totalAckDataLength = length;
+ List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
+ long length = buffers.stream().mapToLong(value -> {
+ int pos = value.position();
+ Preconditions.checkArgument(pos <= chunkSize);
+ return pos;
+ }).sum();
+ // totalAckDataLength replicated yet should always be incremented
+ // with the current length being returned from commitIndex2flushedDataMap.
+ totalAckDataLength += length;
LOG.debug("Total data successfully replicated: " + totalAckDataLength);
futureMap.remove(totalAckDataLength);
// Flush has been committed to required servers successful.
- // just release the current buffer from the buffer pool.
-
- // every entry removed from the putBlock future Map signifies
- // streamBufferFlushSize/chunkSize no of chunks successfully committed.
- // Release the buffers from the buffer pool to be reused again.
- int chunkCount = (int) (streamBufferFlushSize / chunkSize);
- for (int i = 0; i < chunkCount; i++) {
- bufferPool.releaseBuffer();
+ // just release the current buffer from the buffer pool corresponding
+ // to the buffers that have been committed with the putBlock call.
+ for (ByteBuffer byteBuffer : buffers) {
+ bufferPool.releaseBuffer(byteBuffer);
}
}
}
@@ -325,9 +372,10 @@ public class BlockOutputStream extends OutputStream {
waitOnFlushFutures();
}
} catch (InterruptedException | ExecutionException e) {
- adjustBuffersOnException();
- throw new IOException(
+ ioException = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
+ adjustBuffersOnException();
+ throw ioException;
}
if (!commitIndex2flushedDataMap.isEmpty()) {
watchForCommit(
@@ -389,10 +437,14 @@ public class BlockOutputStream extends OutputStream {
}
private CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto> handlePartialFlush()
+ ContainerCommandResponseProto> executePutBlock()
throws IOException {
checkOpen();
long flushPos = totalDataFlushedLength;
+ Preconditions.checkNotNull(bufferList);
+ List<ByteBuffer> byteBufferList = bufferList;
+ bufferList = null;
+ Preconditions.checkNotNull(byteBufferList);
String requestId =
traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID;
CompletableFuture<ContainerProtos.
@@ -410,17 +462,22 @@ public class BlockOutputStream extends OutputStream {
}
// if the ioException is not set, putBlock is successful
if (ioException == null) {
- LOG.debug(
- "Adding index " + asyncReply.getLogIndex() + " commitMap size "
- + commitIndex2flushedDataMap.size());
BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID = responseBlockID;
+ LOG.debug(
+ "Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ + commitIndex2flushedDataMap.size() + " flushLength "
+ + flushPos + " numBuffers " + byteBufferList.size()
+ + " blockID " + blockID + " bufferPool size" + bufferPool
+ .getSize() + " currentBufferIndex " + bufferPool
+ .getCurrentBufferIndex());
// for standalone protocol, logIndex will always be 0.
- commitIndex2flushedDataMap.put(asyncReply.getLogIndex(), flushPos);
+ commitIndex2flushedDataMap
+ .put(asyncReply.getLogIndex(), byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
@@ -446,9 +503,12 @@ public class BlockOutputStream extends OutputStream {
try {
handleFlush();
} catch (InterruptedException | ExecutionException e) {
- adjustBuffersOnException();
- throw new IOException(
+ // just set the exception here as well in order to maintain sanctity of
+ // ioException field
+ ioException = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
+ adjustBuffersOnException();
+ throw ioException;
}
}
}
@@ -456,6 +516,15 @@ public class BlockOutputStream extends OutputStream {
private void writeChunk(ByteBuffer buffer)
throws IOException {
+ // This data in the buffer will be pushed to datanode and a reference will
+ // be added to the bufferList. Once putBlock gets executed, this list will
+ // be marked null. Hence, during first writeChunk call after every putBlock
+ // call or during the first call to writeChunk here, the list will be null.
+
+ if (bufferList == null) {
+ bufferList = new ArrayList<>();
+ }
+ bufferList.add(buffer);
// Please note : We are not flipping the slice when we write since
// the slices are pointing the currentBuffer start and end as needed for
// the chunk write. Also please note, Duplicate does not create a
@@ -472,20 +541,36 @@ public class BlockOutputStream extends OutputStream {
checkOpen();
// flush the last chunk data residing on the currentBuffer
if (totalDataFlushedLength < writtenDataLength) {
- ByteBuffer currentBuffer = bufferPool.getBuffer();
- int pos = currentBuffer.position();
- writeChunk(currentBuffer);
- totalDataFlushedLength += pos;
- handlePartialFlush();
+ ByteBuffer currentBuffer = bufferPool.getCurrentBuffer();
+ Preconditions.checkArgument(currentBuffer.position() > 0);
+ if (currentBuffer.position() != chunkSize) {
+ writeChunk(currentBuffer);
+ }
+ // This can be a partially filled chunk. Since we are flushing the buffer
+ // here, we just limit this buffer to the current position. So that next
+ // write will happen in new buffer
+ updateFlushLength();
+ executePutBlock();
}
waitOnFlushFutures();
+ if (!commitIndex2flushedDataMap.isEmpty()) {
+ // wait for the last commit index in the commitIndex2flushedDataMap
+ // to get committed to all or majority of nodes in case timeout
+ // happens.
+ long lastIndex =
+ commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
+ .max().getAsLong();
+ LOG.debug(
+ "waiting for last flush Index " + lastIndex + " to catch up");
+ watchForCommit(lastIndex);
+ }
+
// just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded
// irrespective of whether the commitIndex2flushedDataMap is empty
// or not, ensure there is no exception set
checkOpen();
-
}
@Override
@@ -494,21 +579,11 @@ public class BlockOutputStream extends OutputStream {
&& bufferPool != null && bufferPool.getSize() > 0) {
try {
handleFlush();
- if (!commitIndex2flushedDataMap.isEmpty()) {
- // wait for the last commit index in the commitIndex2flushedDataMap
- // to get committed to all or majority of nodes in case timeout
- // happens.
- long lastIndex =
- commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
- .max().getAsLong();
- LOG.debug(
- "waiting for last flush Index " + lastIndex + " to catch up");
- watchForCommit(lastIndex);
- }
} catch (InterruptedException | ExecutionException e) {
- adjustBuffersOnException();
- throw new IOException(
+ ioException = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
+ adjustBuffersOnException();
+ throw ioException;
} finally {
cleanup(false);
}
@@ -564,6 +639,10 @@ public class BlockOutputStream extends OutputStream {
futureMap.clear();
}
futureMap = null;
+ if (bufferList != null) {
+ bufferList.clear();
+ }
+ bufferList = null;
if (commitIndex2flushedDataMap != null) {
commitIndex2flushedDataMap.clear();
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index 541e6bd..17788c7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -41,7 +41,7 @@ public class BufferPool {
currentBufferIndex = -1;
}
- public ByteBuffer getBuffer() {
+ public ByteBuffer getCurrentBuffer() {
return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex);
}
@@ -56,7 +56,7 @@ public class BufferPool {
*
*/
public ByteBuffer allocateBufferIfNeeded() {
- ByteBuffer buffer = getBuffer();
+ ByteBuffer buffer = getCurrentBuffer();
if (buffer != null && buffer.hasRemaining()) {
return buffer;
}
@@ -74,11 +74,14 @@ public class BufferPool {
return buffer;
}
- public void releaseBuffer() {
+ public void releaseBuffer(ByteBuffer byteBuffer) {
// always remove from head of the list and append at last
ByteBuffer buffer = bufferList.remove(0);
+ // Ensure the buffer to be removed is always at the head of the list.
+ Preconditions.checkArgument(buffer.equals(byteBuffer));
buffer.clear();
bufferList.add(buffer);
+ Preconditions.checkArgument(currentBufferIndex >= 0);
currentBufferIndex--;
}
@@ -90,6 +93,7 @@ public class BufferPool {
public void checkBufferPoolEmpty() {
Preconditions.checkArgument(computeBufferData() == 0);
}
+
public long computeBufferData() {
return bufferList.stream().mapToInt(value -> value.position())
.sum();
@@ -99,8 +103,12 @@ public class BufferPool {
return bufferList.size();
}
- ByteBuffer getBuffer(int index) {
+ public ByteBuffer getBuffer(int index) {
return bufferList.get(index);
}
+ int getCurrentBufferIndex() {
+ return currentBufferIndex;
+ }
+
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
index a49f8ae..07aa536 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
@@ -74,8 +74,8 @@ public class BlockID {
@Override
public String toString() {
- return new StringBuffer().append(getContainerBlockID().toString())
- .append(" bcId: ")
+ return new StringBuilder().append(getContainerBlockID().toString())
+ .append(" bcsId: ")
.append(blockCommitSequenceId)
.toString();
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index b6de8ab..fb700da 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
import java.io.OutputStream;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -148,11 +149,11 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
}
- long getTotalSuccessfulFlushedData() throws IOException {
+ long getTotalAckDataLength() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
blockID = out.getBlockID();
- return out.getTotalSuccessfulFlushedData();
+ return out.getTotalAckDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
@@ -295,6 +296,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
}
+ @VisibleForTesting
public OutputStream getOutputStream() {
return outputStream;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 3bd572d..78f03d2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -64,6 +64,13 @@ import java.util.concurrent.TimeoutException;
*/
public class KeyOutputStream extends OutputStream {
+ /**
+ * Defines stream action while calling handleFlushOrClose.
+ */
+ enum StreamAction {
+ FLUSH, CLOSE, FULL
+ }
+
public static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStream.class);
@@ -326,8 +333,7 @@ public class KeyOutputStream extends OutputStream {
}
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
- handleFlushOrClose(true);
- currentStreamIndex += 1;
+ handleFlushOrClose(StreamAction.FULL);
}
len -= writeLen;
off += writeLen;
@@ -393,19 +399,21 @@ public class KeyOutputStream extends OutputStream {
boolean retryFailure = checkForRetryFailure(exception);
boolean closedContainerException = false;
if (!retryFailure) {
- closedContainerException = checkIfContainerIsClosed(exception);
+ closedContainerException = checkIfContainerIsClosed(t);
}
PipelineID pipelineId = null;
long totalSuccessfulFlushedData =
- streamEntry.getTotalSuccessfulFlushedData();
+ streamEntry.getTotalAckDataLength();
//set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData();
- LOG.warn("Encountered exception {}", exception);
- LOG.info(
- "The last committed block length is {}, uncommitted data length is {}",
+ LOG.warn("Encountered exception {}. The last committed block length is {}, "
+ + "uncommitted data length is {}", exception,
totalSuccessfulFlushedData, bufferedDataLen);
Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
+ Preconditions.checkArgument(
+ streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData
+ == bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers);
@@ -498,7 +506,7 @@ public class KeyOutputStream extends OutputStream {
return t instanceof ContainerNotOpenException;
}
- private Throwable checkForException(IOException ioe) throws IOException {
+ public Throwable checkForException(IOException ioe) throws IOException {
Throwable t = ioe.getCause();
while (t != null) {
for (Class<? extends Exception> cls : OzoneClientUtils
@@ -513,7 +521,7 @@ public class KeyOutputStream extends OutputStream {
}
private long getKeyLength() {
- return streamEntries.parallelStream().mapToLong(e -> e.getCurrentPosition())
+ return streamEntries.stream().mapToLong(e -> e.getCurrentPosition())
.sum();
}
@@ -535,16 +543,24 @@ public class KeyOutputStream extends OutputStream {
@Override
public void flush() throws IOException {
checkNotClosed();
- handleFlushOrClose(false);
+ handleFlushOrClose(StreamAction.FLUSH);
}
/**
- * Close or Flush the latest outputStream.
- * @param close Flag which decides whether to call close or flush on the
+ * Close or Flush the latest outputStream depending upon the action.
+ * This function gets called when while write is going on, the current stream
+ * gets full or explicit flush or close request is made by client. when the
+ * stream gets full and we try to close the stream , we might end up hitting
+ * an exception in the exception handling path, we write the data residing in
+ * in the buffer pool to a new Block. In cases, as such, when the data gets
+ * written to new stream , it will be at max half full. In such cases, we
+ * should just write the data and not close the stream as the block won't be
+ * completely full.
+ * @param op Flag which decides whether to call close or flush on the
* outputStream.
* @throws IOException In case, flush or close fails with exception.
*/
- private void handleFlushOrClose(boolean close) throws IOException {
+ private void handleFlushOrClose(StreamAction op) throws IOException {
if (streamEntries.size() == 0) {
return;
}
@@ -561,10 +577,21 @@ public class KeyOutputStream extends OutputStream {
if (failedServers != null && !failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers);
}
- if (close) {
+ switch (op) {
+ case CLOSE:
entry.close();
- } else {
+ break;
+ case FULL:
+ if (entry.getRemaining() == 0) {
+ entry.close();
+ currentStreamIndex++;
+ }
+ break;
+ case FLUSH:
entry.flush();
+ break;
+ default:
+ throw new IOException("Invalid Operation");
}
} catch (IOException ioe) {
handleException(entry, streamIndex, ioe);
@@ -587,7 +614,7 @@ public class KeyOutputStream extends OutputStream {
}
closed = true;
try {
- handleFlushOrClose(true);
+ handleFlushOrClose(StreamAction.CLOSE);
if (keyArgs != null) {
// in test, this could be null
removeEmptyBlocks();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 7b65c46..e94f0ac 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -249,6 +250,7 @@ public interface MiniOzoneCluster {
protected Optional<Long> streamBufferFlushSize = Optional.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty();
protected Optional<Long> blockSize = Optional.empty();
+ protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
// Use relative smaller number of handlers for testing
protected int numOfOmHandlers = 20;
protected int numOfScmHandlers = 20;
@@ -434,6 +436,11 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setStreamBufferSizeUnit(StorageUnit unit) {
+ this.streamBufferSizeUnit = Optional.of(unit);
+ return this;
+ }
+
public Builder setOMServiceId(String serviceId) {
this.omServiceId = serviceId;
return this;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 521a4f1..e746f33 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -446,14 +446,18 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
+
+ if (!streamBufferSizeUnit.isPresent()) {
+ streamBufferSizeUnit = Optional.of(StorageUnit.MB);
+ }
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
- chunkSize.get(), StorageUnit.MB);
+ chunkSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
- streamBufferFlushSize.get(), StorageUnit.MB);
+ streamBufferFlushSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
- streamBufferMaxSize.get(), StorageUnit.MB);
+ streamBufferMaxSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
- StorageUnit.MB);
+ streamBufferSizeUnit.get());
configureTrace();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
new file mode 100644
index 0000000..32bef12
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests BlockOutputStream class.
+ */
+public class TestBlockOutputStream {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf = new OzoneConfiguration();
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static int flushSize;
+ private static int maxFlushSize;
+ private static int blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ chunkSize = 100;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(7)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "testblockoutputstream";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ private String getKeyName() {
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testBufferCaching() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = 50;
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data less than a chunk size, the data will just sit
+ // in the buffer, with only one buffer being allocated in the buffer pool
+
+ Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
+ //Just the writtenDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ // no data will be flushed till now
+ Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ Assert.assertEquals(pendingWriteChunkCount,
+ XceiverClientManager.getXceiverClientMetrics()
+ .getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ XceiverClientManager.getXceiverClientMetrics()
+ .getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // commitIndex2FlushedData Map will be empty here
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ // flush is a sync call, all pending operations will complete
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ // we have just written data less than a chunk size, the data will just sit
+ // in the buffer, with only one buffer being allocated in the buffer pool
+
+ Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(0,
+ blockOutputStream.getBufferPool().getBuffer(0).position());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // flush ensures watchForCommit updates the total length acknowledged
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 2,
+ metrics.getTotalOpCount());
+
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+ validateData(keyName, data1);
+ }
+
+ @Test
+ public void testFlushChunk() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = flushSize;
+ // write data equal to 2 chunks
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ Assert.assertEquals(pendingWriteChunkCount + 2,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount + 1,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data equal flush Size = 2 chunks, at this time
+ // buffer pool will have 2 buffers allocated worth of chunk size
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+ Assert.assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+ // flush is a sync call, all pending operations will complete
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // flush ensures watchForCommit updates the total length acknowledged
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 3,
+ metrics.getTotalOpCount());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+ validateData(keyName, data1);
+ }
+
+ @Test
+ public void testMultiChunkWrite() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = chunkSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ Assert.assertEquals(pendingWriteChunkCount + 1,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data equal flush Size > 1 chunk, at this time
+ // buffer pool will have 2 buffers allocated worth of chunk size
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ // since data written is still less than flushLength, flushLength will
+ // still be 0.
+ Assert.assertEquals(0,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+ Assert.assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+ Assert.assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ // flush ensures watchForCommit updates the total length acknowledged
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 3,
+ metrics.getTotalOpCount());
+ validateData(keyName, data1);
+ }
+
+ @Test
+ public void testMultiChunkWrite2() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = flushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ Assert.assertEquals(pendingWriteChunkCount + 2,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount + 1,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data more than flush Size(2 chunks), at this time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ Assert.assertEquals(3, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(flushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+ Assert.assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ Assert.assertEquals(flushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertEquals(0,
+ blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+ Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+ key.close();
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 5,
+ metrics.getTotalOpCount());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+ validateData(keyName, data1);
+ }
+
+ @Test
+ public void testFullBufferCondition() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = maxFlushSize;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes atleast putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
+
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+ validateData(keyName, data1);
+ }
+
+ @Test
+ public void testWriteWithExceedingMaxBufferLimit() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = maxFlushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes atleast putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 8,
+ metrics.getTotalOpCount());
+ Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+ validateData(keyName, data1);
+ }
+
+ private OzoneOutputStream createKey(String keyName, ReplicationType type,
+ long size) throws Exception {
+ return ContainerTestHelper
+ .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+ }
+ private void validateData(String keyName, byte[] data) throws Exception {
+ ContainerTestHelper
+ .validateData(keyName, data, objectStore, volumeName, bucketName);
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
new file mode 100644
index 0000000..671f16c
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests failure detection and handling in BlockOutputStream Class.
+ */
+public class TestBlockOutputStreamWithFailures {
+
+ private static MiniOzoneCluster cluster;
+ private OzoneConfiguration conf = new OzoneConfiguration();
+ private OzoneClient client;
+ private ObjectStore objectStore;
+ private int chunkSize;
+ private int flushSize;
+ private int maxFlushSize;
+ private int blockSize;
+ private String volumeName;
+ private String bucketName;
+ private String keyString;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @Before
+ public void init() throws Exception {
+ chunkSize = 100;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(7)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "testblockoutputstream";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ private String getKeyName() {
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testWatchForCommitWithCloseContainerException() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = maxFlushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes atleast putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data more than flush Size(2 chunks), at this time
+ // buffer pool will have 4 buffers allocated worth of chunk size
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 8,
+ metrics.getTotalOpCount());
+
+ // flush is a sync call, all pending operations will complete
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ // Close the containers on the Datanode and write more data
+ ContainerTestHelper.waitForContainerClose(key, cluster);
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
+ key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
+ key.flush();
+
+ Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+ .getIoException()) instanceof ContainerNotOpenException);
+
+ // commitInfoMap will remain intact as there is no server failure
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 14,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 8,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 22,
+ metrics.getTotalOpCount());
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ validateData(keyName, dataString.concat(dataString).getBytes());
+ }
+
+ @Test
+ public void testWatchForCommitDatanodeFailure() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = maxFlushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes at least putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data more than flush Size(2 chunks), at this time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ // since data written is still less than flushLength, flushLength will
+ // still be 0.
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // ack'd by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast flushSize worth of data buffer
+ // where each entry corresponds to flushSize worth of data
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 8,
+ metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. Since the commit will happen 2 way, the
+ // commitInfoMap will get updated for servers which are alive
+ key.write(data1);
+
+ key.flush();
+ Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
+
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert
+ .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+ // in total, there are 8 full write chunks + 2 partial chunks written
+ Assert.assertEquals(writeChunkCount + 10,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ // 4 flushes at flushSize boundaries + 2 flush for partial chunks
+ Assert.assertEquals(putBlockCount + 6,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 16,
+ metrics.getTotalOpCount());
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ validateData(keyName, dataString.concat(dataString).getBytes());
+ }
+
+ @Test
+ public void test2DatanodesFailure() throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+ ContainerProtos.Type.PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ int dataLength = maxFlushSize + 50;
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(data1);
+ // since its hitting the full bufferCondition, it will call watchForCommit
+ // and completes atleast putBlock for first flushSize worth of data
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+ <= pendingWriteChunkCount + 2);
+ Assert.assertTrue(
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ Assert.assertEquals(writeChunkCount + 4,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 6,
+ metrics.getTotalOpCount());
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+ Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+ OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+ .getOutputStream();
+ Assert.assertTrue(stream instanceof BlockOutputStream);
+ BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+ // we have just written data more than flush Size(2 chunks), at this time
+ // buffer pool will have 3 buffers allocated worth of chunk size
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ // writtenDataLength as well flushedDataLength will be updated here
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(maxFlushSize,
+ blockOutputStream.getTotalDataFlushedLength());
+
+ // since data equals to maxBufferSize is written, this will be a blocking
+ // call and hence will wait for atleast flushSize worth of data to get
+ // acked by all servers right here
+ Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+ // watchForCommit will clean up atleast one entry from the map where each
+ // entry corresponds to flushSize worth of data
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+ // Now do a flush. This will flush the data and update the flush length and
+ // the map.
+ key.flush();
+
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 8,
+ metrics.getTotalOpCount());
+
+ // Since the data in the buffer is already flushed, flush here will have
+ // no impact on the counters and data structures
+
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+ Assert.assertEquals(dataLength,
+ blockOutputStream.getTotalDataFlushedLength());
+ // flush will make sure one more entry gets updated in the map
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+ XceiverClientRatis raftClient =
+ (XceiverClientRatis) blockOutputStream.getXceiverClient();
+ Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+ Pipeline pipeline = raftClient.getPipeline();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+ // again write data with more than max buffer limit. This will call
+ // watchForCommit again. Since the commit will happen 2 way, the
+ // commitInfoMap will get updated for servers which are alive
+
+ // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
+ // once exception is hit
+ key.write(data1);
+
+ // As a part of handling the exception, 4 failed writeChunks will be
+ // rewritten plus one partial chunk plus two putBlocks for flushSize
+ // and one flush for partial chunk
+ key.flush();
+ Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+ .getIoException()) instanceof AlreadyClosedException);
+ // now close the stream, It will update the ack length after watchForCommit
+ key.close();
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ Assert.assertEquals(pendingWriteChunkCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(pendingPutBlockCount,
+ metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(writeChunkCount + 14,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ Assert.assertEquals(putBlockCount + 8,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+ Assert.assertEquals(totalOpCount + 22,
+ metrics.getTotalOpCount());
+ Assert
+ .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+ // make sure the bufferPool is empty
+ Assert
+ .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+ Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+ validateData(keyName, data1);
+ }
+
+ private OzoneOutputStream createKey(String keyName, ReplicationType type,
+ long size) throws Exception {
+ return ContainerTestHelper
+ .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+ }
+ private void validateData(String keyName, byte[] data) throws Exception {
+ ContainerTestHelper
+ .validateData(keyName, data, objectStore, volumeName, bucketName);
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 3124e77..8be1ccc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -338,17 +338,8 @@ public class TestCloseContainerHandlingByClient {
private void waitForContainerClose(OzoneOutputStream outputStream)
throws Exception {
- KeyOutputStream keyOutputStream =
- (KeyOutputStream) outputStream.getOutputStream();
- List<OmKeyLocationInfo> locationInfoList =
- keyOutputStream.getLocationInfoList();
- List<Long> containerIdList = new ArrayList<>();
- for (OmKeyLocationInfo info : locationInfoList) {
- containerIdList.add(info.getContainerID());
- }
- Assert.assertTrue(!containerIdList.isEmpty());
ContainerTestHelper
- .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+ .waitForContainerClose(outputStream, cluster);
}
@Ignore // test needs to be fixed after close container is handled for
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 0b97a00..f08a07f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.common.Checksum;
@@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Preconditions;
@@ -719,6 +721,21 @@ public final class ContainerTestHelper {
return String.format("%1$" + length + "s", string);
}
+ public static void waitForContainerClose(OzoneOutputStream outputStream,
+ MiniOzoneCluster cluster) throws Exception {
+ KeyOutputStream keyOutputStream =
+ (KeyOutputStream) outputStream.getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
+ keyOutputStream.getLocationInfoList();
+ List<Long> containerIdList = new ArrayList<>();
+ for (OmKeyLocationInfo info : locationInfoList) {
+ containerIdList.add(info.getContainerID());
+ }
+ Assert.assertTrue(!containerIdList.isEmpty());
+ ContainerTestHelper
+ .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+ }
+
public static void waitForContainerClose(MiniOzoneCluster cluster,
Long... containerIdList)
throws ContainerNotFoundException, PipelineNotFoundException,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index bb4e676..14db90d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -134,7 +134,7 @@ public class TestBlockDeletion {
String keyName = UUID.randomUUID().toString();
OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
- ReplicationType.STAND_ALONE, ReplicationFactor.ONE, new HashMap<>());
+ ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>());
for (int i = 0; i < 100; i++) {
out.write(value.getBytes());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org