You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/09 03:05:37 UTC
[ozone] 20/36: HDDS-6130. [Ozone-Streaming] When releaseBuffers will get “Couldn 't find the required future” (#2939)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit a794067581b222341ad9bf917683eefd2bfbc71a
Author: micah zhao <mi...@tencent.com>
AuthorDate: Fri Dec 24 01:18:48 2021 +0800
HDDS-6130. [Ozone-Streaming] When releaseBuffers will get “Couldn 't find the required future” (#2939)
---
.../hdds/scm/storage/BlockDataStreamOutput.java | 92 ++++++++++------------
.../hdds/scm/storage/StreamCommitWatcher.java | 28 -------
2 files changed, 42 insertions(+), 78 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 9fb1340527..84968b68d5 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
@@ -115,6 +116,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final StreamCommitWatcher commitWatcher;
+ private final AtomicReference<CompletableFuture<
+ ContainerCommandResponseProto>> putBlockFuture
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final List<DatanodeDetails> failedServers;
private final Checksum checksum;
@@ -381,8 +385,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
* @param force true if no data was written since most recent putBlock and
* stream is being closed
*/
- private CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto> executePutBlock(boolean close,
+ private void executePutBlock(boolean close,
boolean force) throws IOException {
checkOpen();
long flushPos = totalDataFlushedLength;
@@ -399,56 +402,54 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
dataStreamCloseReply = out.closeAsync();
}
- CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto> flushFuture = null;
try {
BlockData blockData = containerBlockData.build();
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close, token);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
- asyncReply.getResponse();
- flushFuture = future.thenApplyAsync(e -> {
- try {
- validateResponse(e);
- } catch (IOException sce) {
- throw new CompletionException(sce);
- }
- // if the ioException is not set, putBlock is successful
- if (getIoException() == null && !force) {
- BlockID responseBlockID = BlockID.getFromProtobuf(
- e.getPutBlock().getCommittedBlockLength().getBlockID());
- Preconditions.checkState(blockID.get().getContainerBlockID()
- .equals(responseBlockID.getContainerBlockID()));
- // updates the bcsId of the block
- blockID.set(responseBlockID);
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ final CompletableFuture<ContainerCommandResponseProto> flushFuture
+ = asyncReply.getResponse().thenApplyAsync(e -> {
+ try {
+ validateResponse(e);
+ } catch (IOException sce) {
+ throw new CompletionException(sce);
+ }
+ // if the ioException is not set, putBlock is successful
+ if (getIoException() == null && !force) {
+ BlockID responseBlockID = BlockID.getFromProtobuf(
+ e.getPutBlock().getCommittedBlockLength().getBlockID());
+ Preconditions.checkState(blockID.get().getContainerBlockID()
+ .equals(responseBlockID.getContainerBlockID()));
+ // updates the bcsId of the block
+ blockID.set(responseBlockID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding index " + asyncReply.getLogIndex() +
+ " commitMap size "
+ commitWatcher.getCommitInfoMapSize() + " flushLength "
+ flushPos + " blockID " + blockID);
- }
- // for standalone protocol, logIndex will always be 0.
- commitWatcher
- .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
- }
- return e;
- }, responseExecutor).exceptionally(e -> {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putBlock failed for blockID {} with exception {}",
- blockID, e.getLocalizedMessage());
- }
- CompletionException ce = new CompletionException(e);
- setIoException(ce);
- throw ce;
- });
+ }
+ // for standalone protocol, logIndex will always be 0.
+ commitWatcher
+ .updateCommitInfoMap(asyncReply.getLogIndex(),
+ byteBufferList);
+ }
+ return e;
+ }, responseExecutor).exceptionally(e -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putBlock failed for blockID {} with exception {}",
+ blockID, e.getLocalizedMessage());
+ }
+ CompletionException ce = new CompletionException(e);
+ setIoException(ce);
+ throw ce;
+ });
+ putBlockFuture.updateAndGet(f -> f.thenCombine(flushFuture,
+ (previous, current) -> current));
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
- commitWatcher.getFutureMap().put(flushPos, flushFuture);
- return flushFuture;
}
@Override
@@ -484,7 +485,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// data since latest flush - we need to send the "EOF" flag
executePutBlock(true, true);
}
- waitOnFlushFutures();
+ putBlockFuture.get().get();
watchForCommit(false);
// just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded
@@ -512,15 +513,6 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
}
}
- private void waitOnFlushFutures()
- throws InterruptedException, ExecutionException {
- CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
- commitWatcher.getFutureMap().values().toArray(
- new CompletableFuture[commitWatcher.getFutureMap().size()]));
- // wait for all the transactions to complete
- combinedFuture.get();
- }
-
private void validateResponse(
ContainerProtos.ContainerCommandResponseProto responseProto)
throws IOException {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index 9ae604e951..1820416d32 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -25,7 +25,6 @@
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.slf4j.Logger;
@@ -35,9 +34,6 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -59,18 +55,12 @@ public class StreamCommitWatcher {
// by all servers
private long totalAckDataLength;
- // future Map to hold up all putBlock futures
- private ConcurrentHashMap<Long,
- CompletableFuture<ContainerCommandResponseProto>>
- futureMap;
-
private XceiverClientSpi xceiverClient;
public StreamCommitWatcher(XceiverClientSpi xceiverClient,
List<StreamBuffer> bufferList) {
this.xceiverClient = xceiverClient;
commitIndexMap = new ConcurrentSkipListMap<>();
- futureMap = new ConcurrentHashMap<>();
this.bufferList = bufferList;
totalAckDataLength = 0;
}
@@ -180,15 +170,6 @@ public class StreamCommitWatcher {
final long length =
buffers.stream().mapToLong(StreamBuffer::position).sum();
totalAckDataLength += length;
- // clear the future object from the future Map
- final CompletableFuture<ContainerCommandResponseProto> remove =
- futureMap.remove(totalAckDataLength);
- if (remove == null) {
- LOG.error("Couldn't find required future for " + totalAckDataLength);
- for (Long key : futureMap.keySet()) {
- LOG.error("Existing acknowledged data: " + key);
- }
- }
for (StreamBuffer byteBuffer : buffers) {
bufferList.remove(byteBuffer);
}
@@ -209,19 +190,10 @@ public class StreamCommitWatcher {
return ioException;
}
- public ConcurrentMap<Long,
- CompletableFuture<
- ContainerCommandResponseProto>> getFutureMap() {
- return futureMap;
- }
-
public void cleanup() {
if (commitIndexMap != null) {
commitIndexMap.clear();
}
- if (futureMap != null) {
- futureMap.clear();
- }
commitIndexMap = null;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org