You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/09 03:05:23 UTC
[ozone] 06/36: HDDS-5599. [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 948e3455dbc8e4c736f8ba650d44c51f79659753
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Aug 25 23:39:10 2021 +0800
HDDS-5599. [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)
---
.../hdds/scm/storage/BlockDataStreamOutput.java | 290 +++------------------
.../hdds/scm/storage/StreamCommitWatcher.java | 166 ++++++++++++
.../client/io/BlockDataStreamOutputEntry.java | 33 +--
.../client/io/BlockDataStreamOutputEntryPool.java | 20 --
.../ozone/client/io/KeyDataStreamOutput.java | 29 +--
5 files changed, 211 insertions(+), 327 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index f658df1af9..39ec2f9219 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -36,21 +36,18 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -93,7 +90,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
- private final BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
// request will fail upfront.
@@ -106,28 +102,16 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
// effective data write attempted so far for the block
private long writtenDataLength;
- // List containing buffers for which the putBlock call will
- // update the length in the datanodes. This list will just maintain
- // references to the buffers in the BufferPool which will be cleared
- // when the watchForCommit acknowledges a putBlock logIndex has been
- // committed on all datanodes. This list will be a place holder for buffers
- // which got written between successive putBlock calls.
- private List<ChunkBuffer> bufferList;
-
// This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
- private final CommitWatcher commitWatcher;
+ private final StreamCommitWatcher commitWatcher;
private final List<DatanodeDetails> failedServers;
private final Checksum checksum;
//number of buffers used before doing a flush/putBlock.
private int flushPeriod;
- //bytes remaining to write in the current buffer.
- private int currentBufferRemaining;
- //current buffer allocated to write
- private ChunkBuffer currentBuffer;
private final Token<? extends TokenIdentifier> token;
private final DataStreamOutput out;
private CompletableFuture<DataStreamReply> dataStreamCloseReply;
@@ -141,13 +125,11 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
* @param blockID block ID
* @param xceiverClientManager client manager that controls client
* @param pipeline pipeline where block will be written
- * @param bufferPool pool of buffers
*/
public BlockDataStreamOutput(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
- BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token
) throws IOException {
@@ -163,11 +145,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
(XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
// Alternatively, stream setup can be delayed till the first chunk write.
this.out = setupStream();
- this.bufferPool = bufferPool;
this.token = token;
- //number of buffers used before doing a flush
- refreshCurrentBuffer(bufferPool);
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
.getStreamBufferSize());
@@ -178,8 +157,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
- commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
- bufferList = null;
+ commitWatcher = new StreamCommitWatcher(xceiverClient);
totalDataFlushedLength = 0;
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
@@ -209,20 +187,10 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
.stream(message.getContent().asReadOnlyByteBuffer());
}
- private void refreshCurrentBuffer(BufferPool pool) {
- currentBuffer = pool.getCurrentBuffer();
- currentBufferRemaining =
- currentBuffer != null ? currentBuffer.remaining() : 0;
- }
-
public BlockID getBlockID() {
return blockID.get();
}
- public long getTotalAckDataLength() {
- return commitWatcher.getTotalAckDataLength();
- }
-
public long getWrittenDataLength() {
return writtenDataLength;
}
@@ -236,82 +204,29 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
return xceiverClient;
}
- @VisibleForTesting
- public long getTotalDataFlushedLength() {
- return totalDataFlushedLength;
- }
-
- @VisibleForTesting
- public BufferPool getBufferPool() {
- return bufferPool;
- }
-
public IOException getIoException() {
return ioException.get();
}
- @VisibleForTesting
- public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
- return commitWatcher.getCommitIndex2flushedDataMap();
- }
-
@Override
- public void write(ByteBuf b) throws IOException {
+ public void write(ByteBuf buf) throws IOException {
checkOpen();
- if (b == null) {
+ if (buf == null) {
throw new NullPointerException();
}
- int off = b.readerIndex();
- int len = b.readableBytes();
-
- while (len > 0) {
- allocateNewBufferIfNeeded();
- final int writeLen = Math.min(currentBufferRemaining, len);
- // TODO: avoid buffer copy here
- currentBuffer.put(b.nioBuffer(off, writeLen));
- currentBufferRemaining -= writeLen;
- writeChunkIfNeeded();
- off += writeLen;
- len -= writeLen;
- writtenDataLength += writeLen;
- doFlushOrWatchIfNeeded();
- }
- }
-
- private void writeChunkIfNeeded() throws IOException {
- if (currentBufferRemaining == 0) {
- writeChunk(currentBuffer);
- }
- }
-
- private void doFlushOrWatchIfNeeded() throws IOException {
- if (currentBufferRemaining == 0) {
- if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
- updateFlushLength();
- executePutBlock(false, false);
- }
- // Data in the bufferPool can not exceed streamBufferMaxSize
- if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
- handleFullBuffer();
- }
+ final int len = buf.readableBytes();
+ if (len == 0) {
+ return;
}
- }
+ writeChunkToContainer(buf);
- private void allocateNewBufferIfNeeded() {
- if (currentBufferRemaining == 0) {
- currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
- currentBufferRemaining = currentBuffer.remaining();
- }
+ writtenDataLength += len;
}
private void updateFlushLength() {
totalDataFlushedLength = writtenDataLength;
}
- private boolean isBufferPoolFull() {
- return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
- }
-
/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
@@ -319,70 +234,9 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
* @throws IOException if error occurred
*/
- // In this case, the data is already cached in the currentBuffer.
+ // TODO: We need add new retry policy without depend on bufferPool.
public void writeOnRetry(long len) throws IOException {
- if (len == 0) {
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Retrying write length {} for blockID {}", len, blockID);
- }
- Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
- int count = 0;
- while (len > 0) {
- ChunkBuffer buffer = bufferPool.getBuffer(count);
- long writeLen = Math.min(buffer.position(), len);
- if (!buffer.hasRemaining()) {
- writeChunk(buffer);
- }
- len -= writeLen;
- count++;
- writtenDataLength += writeLen;
- // we should not call isBufferFull/shouldFlush here.
- // The buffer might already be full as whole data is already cached in
- // the buffer. We should just validate
- // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
- // call for handling full buffer/flush buffer condition.
- if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
- // reset the position to zero as now we will be reading the
- // next buffer in the list
- updateFlushLength();
- executePutBlock(false, false);
- }
- if (writtenDataLength == config.getStreamBufferMaxSize()) {
- handleFullBuffer();
- }
- }
- }
- /**
- * This is a blocking call. It will wait for the flush till the commit index
- * at the head of the commitIndex2flushedDataMap gets replicated to all or
- * majority.
- * @throws IOException
- */
- private void handleFullBuffer() throws IOException {
- try {
- checkOpen();
- if (!commitWatcher.getFutureMap().isEmpty()) {
- waitOnFlushFutures();
- }
- } catch (ExecutionException e) {
- handleExecutionException(e);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- handleInterruptedException(ex, true);
- }
- watchForCommit(true);
- }
-
-
- // It may happen that once the exception is encountered , we still might
- // have successfully flushed up to a certain index. Make sure the buffers
- // only contain data which have not been sufficiently replicated
- private void adjustBuffersOnException() {
- commitWatcher.releaseBuffersOnException();
- refreshCurrentBuffer(bufferPool);
}
/**
@@ -397,7 +251,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
checkOpen();
try {
XceiverClientReply reply = bufferFull ?
- commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
+ commitWatcher.streamWatchOnFirstIndex() :
+ commitWatcher.streamWatchOnLastIndex();
if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
@@ -412,7 +267,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
setIoException(ioe);
throw getIoException();
}
- refreshCurrentBuffer(bufferPool);
}
@@ -426,22 +280,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
boolean force) throws IOException {
checkOpen();
long flushPos = totalDataFlushedLength;
- final List<ChunkBuffer> byteBufferList;
- if (!force) {
- Preconditions.checkNotNull(bufferList);
- byteBufferList = bufferList;
- bufferList = null;
- Preconditions.checkNotNull(byteBufferList);
- } else {
- byteBufferList = null;
- }
-
- try {
- CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
- } catch (Exception e) {
- LOG.warn("Failed to write all chunks through stream: " + e);
- throw new IOException(e);
- }
+ flush();
if (close) {
dataStreamCloseReply = out.closeAsync();
}
@@ -471,15 +310,12 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
- + commitWatcher.getCommitInfoMapSize() + " flushLength "
- + flushPos + " numBuffers " + byteBufferList.size()
- + " blockID " + blockID + " bufferPool size" + bufferPool
- .getSize() + " currentBufferIndex " + bufferPool
- .getCurrentBufferIndex());
+ + commitWatcher.getCommitInfoSetSize() + " flushLength "
+ + flushPos + " blockID " + blockID);
}
// for standalone protocol, logIndex will always be 0.
- commitWatcher
- .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
+ commitWatcher.updateCommitInfoSet(
+ asyncReply.getLogIndex());
}
return e;
}, responseExecutor).exceptionally(e -> {
@@ -503,36 +339,12 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
@Override
public void flush() throws IOException {
- if (xceiverClientFactory != null && xceiverClient != null
- && bufferPool != null && bufferPool.getSize() > 0
- && (!config.isStreamBufferFlushDelay() ||
- writtenDataLength - totalDataFlushedLength
- >= config.getStreamBufferSize())) {
- try {
- handleFlush(false);
- } catch (ExecutionException e) {
- // just set the exception here as well in order to maintain sanctity of
- // ioException field
- handleExecutionException(e);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- handleInterruptedException(ex, true);
- }
- }
- }
-
- private void writeChunk(ChunkBuffer buffer)
- throws IOException {
- // This data in the buffer will be pushed to datanode and a reference will
- // be added to the bufferList. Once putBlock gets executed, this list will
- // be marked null. Hence, during first writeChunk call after every putBlock
- // call or during the first call to writeChunk here, the list will be null.
-
- if (bufferList == null) {
- bufferList = new ArrayList<>();
+ try {
+ CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
+ } catch (Exception e) {
+ LOG.warn("Failed to write all chunks through stream: " + e);
+ throw new IOException(e);
}
- bufferList.add(buffer);
- writeChunkToContainer(buffer.duplicate(0, buffer.position()));
}
/**
@@ -543,11 +355,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
checkOpen();
// flush the last chunk data residing on the currentBuffer
if (totalDataFlushedLength < writtenDataLength) {
- refreshCurrentBuffer(bufferPool);
- Preconditions.checkArgument(currentBuffer.position() > 0);
- if (currentBuffer.hasRemaining()) {
- writeChunk(currentBuffer);
- }
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
@@ -570,8 +377,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
@Override
public void close() throws IOException {
- if (xceiverClientFactory != null && xceiverClient != null
- && bufferPool != null && bufferPool.getSize() > 0) {
+ if (xceiverClientFactory != null && xceiverClient != null) {
try {
handleFlush(true);
dataStreamCloseReply.get();
@@ -583,10 +389,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
} finally {
cleanup(false);
}
- // TODO: Turn the below buffer empty check on when Standalone pipeline
- // is removed in the write path in tests
- // Preconditions.checkArgument(buffer.position() == 0);
- // bufferPool.checkBufferPoolEmpty();
}
}
@@ -638,10 +440,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
xceiverClientFactory = null;
xceiverClient = null;
commitWatcher.cleanup();
- if (bufferList != null) {
- bufferList.clear();
- }
- bufferList = null;
responseExecutor.shutdown();
}
@@ -655,7 +453,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
if (isClosed()) {
throw new IOException("BlockDataStreamOutput has been closed.");
} else if (getIoException() != null) {
- adjustBuffersOnException();
throw getIoException();
}
}
@@ -683,12 +480,11 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
* @throws OzoneChecksumException if there is an error while computing
* checksum
*/
- private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
- int effectiveChunkSize = chunk.remaining();
+ private void writeChunkToContainer(ByteBuf buf)
+ throws IOException {
+ ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer());
+ int effectiveChunkSize = buf.readableBytes();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
- final ByteString data = chunk.toByteString(
- bufferPool.byteStringConversion());
- ChecksumData checksumData = checksum.computeChecksum(chunk);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
.setOffset(offset)
@@ -703,21 +499,22 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
CompletableFuture<DataStreamReply> future =
(needSync(offset + effectiveChunkSize) ?
- out.writeAsync(data.asReadOnlyByteBuffer(), StandardWriteOption.SYNC) :
- out.writeAsync(data.asReadOnlyByteBuffer()))
- .whenCompleteAsync((r, e) -> {
- if (e != null || !r.isSuccess()) {
- if (e == null) {
- e = new IOException("result is not success");
- }
- String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
- " " + "into block " + blockID;
- LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
- CompletionException ce = new CompletionException(msg, e);
- setIoException(ce);
- throw ce;
- }
- }, responseExecutor);
+ out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) :
+ out.writeAsync(buf.nioBuffer()))
+ .whenCompleteAsync((r, e) -> {
+ if (e != null || !r.isSuccess()) {
+ if (e == null) {
+ e = new IOException("result is not success");
+ }
+ String msg =
+ "Failed to write chunk " + chunkInfo.getChunkName() +
+ " " + "into block " + blockID;
+ LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+ CompletionException ce = new CompletionException(msg, e);
+ setIoException(ce);
+ throw ce;
+ }
+ }, responseExecutor);
futures.add(future);
containerBlockData.addChunks(chunkInfo);
@@ -754,7 +551,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
*/
private void handleExecutionException(Exception ex) throws IOException {
setIoException(ex);
- adjustBuffersOnException();
throw getIoException();
}
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
new file mode 100644
index 0000000000..c187ffe902
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This class maintains the map of the commitIndexes to be watched for
+ * successful replication in the datanodes in a given pipeline. It also releases
+ * the buffers associated with the user data back to {@Link BufferPool} once
+ * minimum replication criteria is achieved during an ozone key write.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class executes watchForCommit on ratis pipeline and releases
+ * buffers once data successfully gets replicated.
+ */
+public class StreamCommitWatcher {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamCommitWatcher.class);
+
+ private Set<Long> commitIndexSet;
+
+ // future Map to hold up all putBlock futures
+ private ConcurrentHashMap<Long,
+ CompletableFuture<ContainerCommandResponseProto>>
+ futureMap;
+
+ private XceiverClientSpi xceiverClient;
+
+ public StreamCommitWatcher(XceiverClientSpi xceiverClient) {
+ this.xceiverClient = xceiverClient;
+ commitIndexSet = new ConcurrentSkipListSet();
+ futureMap = new ConcurrentHashMap<>();
+ }
+
+ public void updateCommitInfoSet(long index) {
+ commitIndexSet.add(index);
+ }
+
+ int getCommitInfoSetSize() {
+ return commitIndexSet.size();
+ }
+
+ /**
+ * Calls watch for commit for the first index in commitIndex2flushedDataMap to
+ * the Ratis client.
+ * @return {@link XceiverClientReply} reply from raft client
+ * @throws IOException in case watchForCommit fails
+ */
+ public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
+ if (!commitIndexSet.isEmpty()) {
+ // wait for the first commit index in the commitIndex2flushedDataMap
+ // to get committed to all or majority of nodes in case timeout
+ // happens.
+ long index =
+ commitIndexSet.stream().mapToLong(v -> v).min()
+ .getAsLong();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("waiting for first index {} to catch up", index);
+ }
+ return streamWatchForCommit(index);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Calls watch for commit for the last index in commitIndex2flushedDataMap to
+ * the Ratis client.
+ * @return {@link XceiverClientReply} reply from raft client
+ * @throws IOException in case watchForCommit fails
+ */
+ public XceiverClientReply streamWatchOnLastIndex()
+ throws IOException {
+ if (!commitIndexSet.isEmpty()) {
+ // wait for the commit index in the commitIndex2flushedDataMap
+ // to get committed to all or majority of nodes in case timeout
+ // happens.
+ long index =
+ commitIndexSet.stream().mapToLong(v -> v).max()
+ .getAsLong();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("waiting for last flush Index {} to catch up", index);
+ }
+ return streamWatchForCommit(index);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * calls watchForCommit API of the Ratis Client. This method is for streaming
+ * and no longer requires releaseBuffers
+ * @param commitIndex log index to watch for
+ * @return minimum commit index replicated to all nodes
+ * @throws IOException IOException in case watch gets timed out
+ */
+ public XceiverClientReply streamWatchForCommit(long commitIndex)
+ throws IOException {
+ try {
+ XceiverClientReply reply =
+ xceiverClient.watchForCommit(commitIndex);
+ return reply;
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ throw getIOExceptionForWatchForCommit(commitIndex, e);
+ } catch (TimeoutException | ExecutionException e) {
+ throw getIOExceptionForWatchForCommit(commitIndex, e);
+ }
+ }
+
+ private IOException getIOExceptionForWatchForCommit(long commitIndex,
+ Exception e) {
+ LOG.warn("watchForCommit failed for index {}", commitIndex, e);
+ IOException ioException = new IOException(
+ "Unexpected Storage Container Exception: " + e.toString(), e);
+ return ioException;
+ }
+
+ public ConcurrentMap<Long,
+ CompletableFuture<
+ ContainerCommandResponseProto>> getFutureMap() {
+ return futureMap;
+ }
+
+ public void cleanup() {
+ if (commitIndexSet != null) {
+ commitIndexSet.clear();
+ }
+ if (futureMap != null) {
+ futureMap.clear();
+ }
+ commitIndexSet = null;
+ }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index 6954742601..98907bf8af 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -52,15 +51,12 @@ public final class BlockDataStreamOutputEntry
private long currentPosition;
private final Token<OzoneBlockTokenIdentifier> token;
- private BufferPool bufferPool;
-
@SuppressWarnings({"parameternumber", "squid:S00107"})
private BlockDataStreamOutputEntry(
BlockID blockID, String key,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
long length,
- BufferPool bufferPool,
Token<OzoneBlockTokenIdentifier> token,
OzoneClientConfig config
) {
@@ -73,7 +69,6 @@ public final class BlockDataStreamOutputEntry
this.token = token;
this.length = length;
this.currentPosition = 0;
- this.bufferPool = bufferPool;
}
long getLength() {
@@ -98,7 +93,7 @@ public final class BlockDataStreamOutputEntry
if (this.byteBufStreamOutput == null) {
this.byteBufStreamOutput =
new BlockDataStreamOutput(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token);
+ pipeline, config, token);
}
}
@@ -135,20 +130,6 @@ public final class BlockDataStreamOutputEntry
return false;
}
- long getTotalAckDataLength() {
- if (byteBufStreamOutput != null) {
- BlockDataStreamOutput out =
- (BlockDataStreamOutput) this.byteBufStreamOutput;
- blockID = out.getBlockID();
- return out.getTotalAckDataLength();
- } else {
- // For a pre allocated block for which no write has been initiated,
- // the ByteBufStreamOutput will be null here.
- // In such cases, the default blockCommitSequenceId will be 0
- return 0;
- }
- }
-
Collection<DatanodeDetails> getFailedServers() {
if (byteBufStreamOutput != null) {
BlockDataStreamOutput out =
@@ -198,7 +179,6 @@ public final class BlockDataStreamOutputEntry
private XceiverClientFactory xceiverClientManager;
private Pipeline pipeline;
private long length;
- private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
@@ -230,12 +210,6 @@ public final class BlockDataStreamOutputEntry
return this;
}
-
- public Builder setBufferPool(BufferPool pool) {
- this.bufferPool = pool;
- return this;
- }
-
public Builder setConfig(OzoneClientConfig clientConfig) {
this.config = clientConfig;
return this;
@@ -252,7 +226,6 @@ public final class BlockDataStreamOutputEntry
xceiverClientManager,
pipeline,
length,
- bufferPool,
token, config);
}
}
@@ -282,10 +255,6 @@ public final class BlockDataStreamOutputEntry
return currentPosition;
}
- public BufferPool getBufferPool() {
- return bufferPool;
- }
-
public void setCurrentPosition(long curPosition) {
this.currentPosition = curPosition;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 94c505f2af..4bc55de262 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -22,12 +22,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -58,7 +56,6 @@ public class BlockDataStreamOutputEntryPool {
private final OmKeyArgs keyArgs;
private final XceiverClientFactory xceiverClientFactory;
private final String requestID;
- private final BufferPool bufferPool;
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private final long openID;
private final ExcludeList excludeList;
@@ -86,13 +83,6 @@ public class BlockDataStreamOutputEntryPool {
this.requestID = requestId;
this.openID = openID;
this.excludeList = new ExcludeList();
-
- this.bufferPool =
- new BufferPool(config.getStreamBufferSize(),
- (int) (config.getStreamBufferMaxSize() / config
- .getStreamBufferSize()),
- ByteStringConversion
- .createByteBufferConversion(unsafeByteBufferConversion));
}
/**
@@ -114,8 +104,6 @@ public class BlockDataStreamOutputEntryPool {
config.setStreamBufferFlushDelay(false);
requestID = null;
int chunkSize = 0;
- bufferPool = new BufferPool(chunkSize, 1);
-
currentStreamIndex = 0;
openID = -1;
excludeList = new ExcludeList();
@@ -154,7 +142,6 @@ public class BlockDataStreamOutputEntryPool {
.setPipeline(subKeyInfo.getPipeline())
.setConfig(config)
.setLength(subKeyInfo.getLength())
- .setBufferPool(bufferPool)
.setToken(subKeyInfo.getToken());
streamEntries.add(builder.build());
}
@@ -293,17 +280,10 @@ public class BlockDataStreamOutputEntryPool {
return streamEntries.get(currentStreamIndex);
}
- long computeBufferData() {
- return bufferPool.computeBufferData();
- }
-
void cleanup() {
if (excludeList != null) {
excludeList.clear();
}
- if (bufferPool != null) {
- bufferPool.clearBufferPool();
- }
if (streamEntries != null) {
streamEntries.clear();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index a9be11667c..c37f9cd51d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -279,27 +279,7 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
}
Pipeline pipeline = streamEntry.getPipeline();
PipelineID pipelineId = pipeline.getId();
- long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
- //set the correct length for the current stream
- streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
- long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();
- if (containerExclusionException) {
- LOG.debug(
- "Encountered exception {}. The last committed block length is {}, "
- + "uncommitted data length is {} retry count {}", exception,
- totalSuccessfulFlushedData, bufferedDataLen, retryCount);
- } else {
- LOG.warn(
- "Encountered exception {} on the pipeline {}. "
- + "The last committed block length is {}, "
- + "uncommitted data length is {} retry count {}", exception,
- pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
- }
- Preconditions.checkArgument(
- bufferedDataLen <= config.getStreamBufferMaxSize());
- Preconditions.checkArgument(
- offset - blockDataStreamOutputEntryPool.getKeyLength() ==
- bufferedDataLen);
+
long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers);
@@ -337,13 +317,6 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
blockDataStreamOutputEntryPool
.discardPreallocatedBlocks(-1, pipelineId);
}
- if (bufferedDataLen > 0) {
- // If the data is still cached in the underlying stream, we need to
- // allocate new block and write this data in the datanode.
- handleRetry(exception, bufferedDataLen);
- // reset the retryCount after handling the exception
- retryCount = 0;
- }
}
private void markStreamClosed() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org