You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/03/24 12:41:32 UTC
[ozone] 11/31: HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit ea34f61832b9aca0cd666aed8be8552489665d4f
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Thu Oct 21 12:57:30 2021 +0530
HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
---
.../hdds/scm/storage/BlockDataStreamOutput.java | 62 ++++++++++++---
.../hadoop/hdds/scm/storage/StreamBuffer.java | 46 +++++++++++
.../hdds/scm/storage/StreamCommitWatcher.java | 93 ++++++++++++++++++----
.../client/io/BlockDataStreamOutputEntry.java | 33 +++++++-
.../client/io/BlockDataStreamOutputEntryPool.java | 14 +++-
.../ozone/client/io/KeyDataStreamOutput.java | 12 ++-
.../client/rpc/TestBlockDataStreamOutput.java | 30 +++++++
.../apache/hadoop/ozone/container/TestHelper.java | 20 +++++
8 files changed, 279 insertions(+), 31 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 2ae0ba7..aada48e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -92,6 +92,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
+
+ // Similar to 'BufferPool' but this list maintains only references
+ // to the ByteBuffers.
+ private List<StreamBuffer> bufferList;
+
// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
// request will fail upfront.
@@ -133,7 +138,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
OzoneClientConfig config,
- Token<? extends TokenIdentifier> token
+ Token<? extends TokenIdentifier> token,
+ List<StreamBuffer> bufferList
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
@@ -148,7 +154,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// Alternatively, stream setup can be delayed till the first chunk write.
this.out = setupStream(pipeline);
this.token = token;
-
+ this.bufferList = bufferList;
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
.getStreamBufferSize());
@@ -159,7 +165,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
- commitWatcher = new StreamCommitWatcher(xceiverClient);
+ commitWatcher = new StreamCommitWatcher(xceiverClient, bufferList);
totalDataFlushedLength = 0;
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
@@ -251,8 +257,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
if (len == 0) {
return;
}
- writeChunkToContainer(
- (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len));
+
+ final StreamBuffer buf = new StreamBuffer(b, off, len);
+ bufferList.add(buf);
+
+ writeChunkToContainer(buf.duplicate());
writtenDataLength += len;
}
@@ -261,6 +270,10 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
totalDataFlushedLength = writtenDataLength;
}
+ @VisibleForTesting
+ public long getTotalDataFlushedLength() {
+ return totalDataFlushedLength;
+ }
/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
@@ -268,8 +281,27 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
* @throws IOException if error occurred
*/
- // TODO: We need add new retry policy without depend on bufferPool.
public void writeOnRetry(long len) throws IOException {
+ if (len == 0) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrying write length {} for blockID {}", len, blockID);
+ }
+ int count = 0;
+ while (len > 0) {
+ final StreamBuffer buf = bufferList.get(count);
+ final long writeLen = Math.min(buf.length(), len);
+ final ByteBuffer duplicated = buf.duplicate();
+ if (writeLen != buf.length()) {
+ duplicated.limit(Math.toIntExact(len));
+ }
+ writeChunkToContainer(duplicated);
+ len -= writeLen;
+ count++;
+ writtenDataLength += writeLen;
+ }
+
}
@@ -314,6 +346,14 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
boolean force) throws IOException {
checkOpen();
long flushPos = totalDataFlushedLength;
+ final List<StreamBuffer> byteBufferList;
+ if (!force) {
+ Preconditions.checkNotNull(bufferList);
+ byteBufferList = bufferList;
+ Preconditions.checkNotNull(byteBufferList);
+ } else {
+ byteBufferList = null;
+ }
flush();
if (close) {
dataStreamCloseReply = out.closeAsync();
@@ -344,12 +384,12 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
- + commitWatcher.getCommitInfoSetSize() + " flushLength "
+ + commitWatcher.getCommitInfoMapSize() + " flushLength "
+ flushPos + " blockID " + blockID);
}
// for standalone protocol, logIndex will always be 0.
- commitWatcher.updateCommitInfoSet(
- asyncReply.getLogIndex());
+ commitWatcher
+ .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
@@ -589,4 +629,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
setIoException(ex);
throw getIoException();
}
+
+ public long getTotalAckDataLength() {
+ return commitWatcher.getTotalAckDataLength();
+ }
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
new file mode 100644
index 0000000..f36019e
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Used for streaming write.
+ */
+public class StreamBuffer {
+ private final ByteBuffer buffer;
+
+ public StreamBuffer(ByteBuffer buffer) {
+ this.buffer = buffer.asReadOnlyBuffer();
+ }
+
+ public StreamBuffer(ByteBuffer buffer, int offset, int length) {
+ this((ByteBuffer) buffer.asReadOnlyBuffer().position(offset)
+ .limit(offset + length));
+ }
+
+ public ByteBuffer duplicate() {
+ return buffer.duplicate();
+ }
+
+ public int length() {
+ return buffer.limit() - buffer.position();
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index c187ffe..3a59d07 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -24,6 +24,7 @@
*/
package org.apache.hadoop.hdds.scm.storage;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -31,13 +32,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Set;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
/**
* This class executes watchForCommit on ratis pipeline and releases
@@ -48,7 +52,12 @@ public class StreamCommitWatcher {
private static final Logger LOG =
LoggerFactory.getLogger(StreamCommitWatcher.class);
- private Set<Long> commitIndexSet;
+ private Map<Long, List<StreamBuffer>> commitIndexMap;
+ private List<StreamBuffer> bufferList;
+
+ // total data which has been successfully flushed and acknowledged
+ // by all servers
+ private long totalAckDataLength;
// future Map to hold up all putBlock futures
private ConcurrentHashMap<Long,
@@ -57,18 +66,22 @@ public class StreamCommitWatcher {
private XceiverClientSpi xceiverClient;
- public StreamCommitWatcher(XceiverClientSpi xceiverClient) {
+ public StreamCommitWatcher(XceiverClientSpi xceiverClient,
+ List<StreamBuffer> bufferList) {
this.xceiverClient = xceiverClient;
- commitIndexSet = new ConcurrentSkipListSet();
+ commitIndexMap = new ConcurrentSkipListMap<>();
futureMap = new ConcurrentHashMap<>();
+ this.bufferList = bufferList;
+ totalAckDataLength = 0;
}
- public void updateCommitInfoSet(long index) {
- commitIndexSet.add(index);
+ public void updateCommitInfoMap(long index, List<StreamBuffer> buffers) {
+ commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
+ .addAll(buffers);
}
- int getCommitInfoSetSize() {
- return commitIndexSet.size();
+ int getCommitInfoMapSize() {
+ return commitIndexMap.size();
}
/**
@@ -78,12 +91,12 @@ public class StreamCommitWatcher {
* @throws IOException in case watchForCommit fails
*/
public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
- if (!commitIndexSet.isEmpty()) {
+ if (!commitIndexMap.isEmpty()) {
// wait for the first commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long index =
- commitIndexSet.stream().mapToLong(v -> v).min()
+ commitIndexMap.keySet().stream().mapToLong(v -> v).min()
.getAsLong();
if (LOG.isDebugEnabled()) {
LOG.debug("waiting for first index {} to catch up", index);
@@ -102,12 +115,12 @@ public class StreamCommitWatcher {
*/
public XceiverClientReply streamWatchOnLastIndex()
throws IOException {
- if (!commitIndexSet.isEmpty()) {
+ if (!commitIndexMap.isEmpty()) {
// wait for the commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long index =
- commitIndexSet.stream().mapToLong(v -> v).max()
+ commitIndexMap.keySet().stream().mapToLong(v -> v).max()
.getAsLong();
if (LOG.isDebugEnabled()) {
LOG.debug("waiting for last flush Index {} to catch up", index);
@@ -127,9 +140,16 @@ public class StreamCommitWatcher {
*/
public XceiverClientReply streamWatchForCommit(long commitIndex)
throws IOException {
+ final long index;
try {
XceiverClientReply reply =
xceiverClient.watchForCommit(commitIndex);
+ if (reply == null) {
+ index = 0;
+ } else {
+ index = reply.getLogIndex();
+ }
+ adjustBuffers(index);
return reply;
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
@@ -140,11 +160,52 @@ public class StreamCommitWatcher {
}
}
+ void releaseBuffersOnException() {
+ adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+ }
+
+ private void adjustBuffers(long commitIndex) {
+ List<Long> keyList = commitIndexMap.keySet().stream()
+ .filter(p -> p <= commitIndex).collect(Collectors.toList());
+ if (!keyList.isEmpty()) {
+ releaseBuffers(keyList);
+ }
+ }
+
+ private long releaseBuffers(List<Long> indexes) {
+ Preconditions.checkArgument(!commitIndexMap.isEmpty());
+ for (long index : indexes) {
+ Preconditions.checkState(commitIndexMap.containsKey(index));
+ final List<StreamBuffer> buffers = commitIndexMap.remove(index);
+ final long length =
+ buffers.stream().mapToLong(StreamBuffer::length).sum();
+ totalAckDataLength += length;
+ // clear the future object from the future Map
+ final CompletableFuture<ContainerCommandResponseProto> remove =
+ futureMap.remove(totalAckDataLength);
+ if (remove == null) {
+ LOG.error("Couldn't find required future for " + totalAckDataLength);
+ for (Long key : futureMap.keySet()) {
+ LOG.error("Existing acknowledged data: " + key);
+ }
+ }
+ for (StreamBuffer byteBuffer : buffers) {
+ bufferList.remove(byteBuffer);
+ }
+ }
+ return totalAckDataLength;
+ }
+
+ public long getTotalAckDataLength() {
+ return totalAckDataLength;
+ }
+
private IOException getIOExceptionForWatchForCommit(long commitIndex,
Exception e) {
LOG.warn("watchForCommit failed for index {}", commitIndex, e);
IOException ioException = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
+ releaseBuffersOnException();
return ioException;
}
@@ -155,12 +216,12 @@ public class StreamCommitWatcher {
}
public void cleanup() {
- if (commitIndexSet != null) {
- commitIndexSet.clear();
+ if (commitIndexMap != null) {
+ commitIndexMap.clear();
}
if (futureMap != null) {
futureMap.clear();
}
- commitIndexSet = null;
+ commitIndexMap = null;
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index f0c3a43..2cd5630 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.StreamBuffer;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -32,6 +33,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
/**
* Helper class used inside {@link BlockDataStreamOutput}.
@@ -50,6 +52,7 @@ public final class BlockDataStreamOutputEntry
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
private final Token<OzoneBlockTokenIdentifier> token;
+ private List<StreamBuffer> bufferList;
@SuppressWarnings({"parameternumber", "squid:S00107"})
private BlockDataStreamOutputEntry(
@@ -58,7 +61,8 @@ public final class BlockDataStreamOutputEntry
Pipeline pipeline,
long length,
Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config
+ OzoneClientConfig config,
+ List<StreamBuffer> bufferList
) {
this.config = config;
this.byteBufferStreamOutput = null;
@@ -69,6 +73,7 @@ public final class BlockDataStreamOutputEntry
this.token = token;
this.length = length;
this.currentPosition = 0;
+ this.bufferList = bufferList;
}
long getLength() {
@@ -92,8 +97,8 @@ public final class BlockDataStreamOutputEntry
private void checkStream() throws IOException {
if (this.byteBufferStreamOutput == null) {
this.byteBufferStreamOutput =
- new BlockDataStreamOutput(blockID, xceiverClientManager,
- pipeline, config, token);
+ new BlockDataStreamOutput(blockID, xceiverClientManager, pipeline,
+ config, token, bufferList);
}
}
@@ -151,6 +156,20 @@ public final class BlockDataStreamOutputEntry
}
}
+ long getTotalAckDataLength() {
+ if (byteBufferStreamOutput != null) {
+ BlockDataStreamOutput out =
+ (BlockDataStreamOutput) this.byteBufferStreamOutput;
+ blockID = out.getBlockID();
+ return out.getTotalAckDataLength();
+ } else {
+ // For a pre allocated block for which no write has been initiated,
+ // the OutputStream will be null here.
+ // In such cases, the default blockCommitSequenceId will be 0
+ return 0;
+ }
+ }
+
void cleanup(boolean invalidateClient) throws IOException {
checkStream();
BlockDataStreamOutput out =
@@ -180,6 +199,7 @@ public final class BlockDataStreamOutputEntry
private long length;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
+ private List<StreamBuffer> bufferList;
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -219,13 +239,18 @@ public final class BlockDataStreamOutputEntry
return this;
}
+ public Builder setBufferList(List<StreamBuffer> bList) {
+ this.bufferList = bList;
+ return this;
+ }
+
public BlockDataStreamOutputEntry build() {
return new BlockDataStreamOutputEntry(blockID,
key,
xceiverClientManager,
pipeline,
length,
- token, config);
+ token, config, bufferList);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 4bc55de..e49b0b7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.StreamBuffer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -59,6 +60,7 @@ public class BlockDataStreamOutputEntryPool {
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private final long openID;
private final ExcludeList excludeList;
+ private List<StreamBuffer> bufferList;
@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockDataStreamOutputEntryPool(
@@ -83,6 +85,7 @@ public class BlockDataStreamOutputEntryPool {
this.requestID = requestId;
this.openID = openID;
this.excludeList = new ExcludeList();
+ this.bufferList = new ArrayList<>();
}
/**
@@ -142,7 +145,8 @@ public class BlockDataStreamOutputEntryPool {
.setPipeline(subKeyInfo.getPipeline())
.setConfig(config)
.setLength(subKeyInfo.getLength())
- .setToken(subKeyInfo.getToken());
+ .setToken(subKeyInfo.getToken())
+ .setBufferList(bufferList);
streamEntries.add(builder.build());
}
@@ -301,4 +305,12 @@ public class BlockDataStreamOutputEntryPool {
boolean isEmpty() {
return streamEntries.isEmpty();
}
+
+ long computeBufferData() {
+ long totalDataLen =0;
+ for (StreamBuffer b : bufferList){
+ totalDataLen += b.length();
+ }
+ return totalDataLen;
+ }
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 9bba89d..2540e42 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -278,11 +278,14 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
}
Pipeline pipeline = streamEntry.getPipeline();
PipelineID pipelineId = pipeline.getId();
-
+ long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
+ //set the correct length for the current stream
+ streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers);
ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList();
+ long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();
if (!failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers);
}
@@ -316,6 +319,13 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
blockDataStreamOutputEntryPool
.discardPreallocatedBlocks(-1, pipelineId);
}
+ if (bufferedDataLen > 0) {
+ // If the data is still cached in the underlying stream, we need to
+ // allocate new block and write this data in the datanode.
+ handleRetry(exception, bufferedDataLen);
+ // reset the retryCount after handling the exception
+ retryCount = 0;
+ }
}
private void markStreamClosed() {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index d3b2d22..05a1019 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -21,15 +21,19 @@ import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -127,21 +131,25 @@ public class TestBlockDataStreamOutput {
@Test
public void testHalfChunkWrite() throws Exception {
testWrite(chunkSize / 2);
+ testWriteWithFailure(chunkSize/2);
}
@Test
public void testSingleChunkWrite() throws Exception {
testWrite(chunkSize);
+ testWriteWithFailure(chunkSize);
}
@Test
public void testMultiChunkWrite() throws Exception {
testWrite(chunkSize + 50);
+ testWriteWithFailure(chunkSize + 50);
}
@Test
public void testMultiBlockWrite() throws Exception {
testWrite(blockSize + 50);
+ testWriteWithFailure(blockSize + 50);
}
private void testWrite(int dataLength) throws Exception {
@@ -156,6 +164,28 @@ public class TestBlockDataStreamOutput {
key.close();
validateData(keyName, data);
}
+
+ private void testWriteWithFailure(int dataLength) throws Exception {
+ String keyName = getKeyName();
+ OzoneDataStreamOutput key = createKey(
+ keyName, ReplicationType.RATIS, 0);
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ ByteBuffer b = ByteBuffer.wrap(data);
+ key.write(b);
+ KeyDataStreamOutput keyDataStreamOutput =
+ (KeyDataStreamOutput) key.getByteBufStreamOutput();
+ ByteBufferStreamOutput stream =
+ keyDataStreamOutput.getStreamEntries().get(0).getByteBufStreamOutput();
+ Assert.assertTrue(stream instanceof BlockDataStreamOutput);
+ TestHelper.waitForContainerClose(key, cluster);
+ key.write(b);
+ key.close();
+ String dataString = new String(data, UTF_8);
+ validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+ }
+
private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
long size) throws Exception {
return TestHelper.createStreamKey(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 2beaf02..2808525 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -46,7 +46,9 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@@ -198,6 +200,24 @@ public final class TestHelper {
waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
}
+
+ public static void waitForContainerClose(OzoneDataStreamOutput outputStream,
+ MiniOzoneCluster cluster) throws Exception {
+ KeyDataStreamOutput keyOutputStream =
+ (KeyDataStreamOutput) outputStream.getByteBufStreamOutput();
+ List<BlockDataStreamOutputEntry> streamEntryList =
+ keyOutputStream.getStreamEntries();
+ List<Long> containerIdList = new ArrayList<>();
+ for (BlockDataStreamOutputEntry entry : streamEntryList) {
+ long id = entry.getBlockID().getContainerID();
+ if (!containerIdList.contains(id)) {
+ containerIdList.add(id);
+ }
+ }
+ Assert.assertTrue(!containerIdList.isEmpty());
+ waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+ }
+
public static void waitForPipelineClose(OzoneOutputStream outputStream,
MiniOzoneCluster cluster, boolean waitForContainerCreation)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org