You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:11 UTC

[ozone] 06/35: HDDS-5599. [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 0790843af01b0e439c0f39c80f1c478cc68632c7
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Aug 25 23:39:10 2021 +0800

    HDDS-5599.  [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 290 +++------------------
 .../hdds/scm/storage/StreamCommitWatcher.java      | 166 ++++++++++++
 .../client/io/BlockDataStreamOutputEntry.java      |  33 +--
 .../client/io/BlockDataStreamOutputEntryPool.java  |  20 --
 .../ozone/client/io/KeyDataStreamOutput.java       |  29 +--
 5 files changed, 211 insertions(+), 327 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index f658df1af9..39ec2f9219 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -36,21 +36,18 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -93,7 +90,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
   private int chunkIndex;
   private final AtomicLong chunkOffset = new AtomicLong();
-  private final BufferPool bufferPool;
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
   // request will fail upfront.
@@ -106,28 +102,16 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
   // effective data write attempted so far for the block
   private long writtenDataLength;
 
-  // List containing buffers for which the putBlock call will
-  // update the length in the datanodes. This list will just maintain
-  // references to the buffers in the BufferPool which will be cleared
-  // when the watchForCommit acknowledges a putBlock logIndex has been
-  // committed on all datanodes. This list will be a  place holder for buffers
-  // which got written between successive putBlock calls.
-  private List<ChunkBuffer> bufferList;
-
   // This object will maintain the commitIndexes and byteBufferList in order
   // Also, corresponding to the logIndex, the corresponding list of buffers will
   // be released from the buffer pool.
-  private final CommitWatcher commitWatcher;
+  private final StreamCommitWatcher commitWatcher;
 
   private final List<DatanodeDetails> failedServers;
   private final Checksum checksum;
 
   //number of buffers used before doing a flush/putBlock.
   private int flushPeriod;
-  //bytes remaining to write in the current buffer.
-  private int currentBufferRemaining;
-  //current buffer allocated to write
-  private ChunkBuffer currentBuffer;
   private final Token<? extends TokenIdentifier> token;
   private final DataStreamOutput out;
   private CompletableFuture<DataStreamReply> dataStreamCloseReply;
@@ -141,13 +125,11 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * @param blockID              block ID
    * @param xceiverClientManager client manager that controls client
    * @param pipeline             pipeline where block will be written
-   * @param bufferPool           pool of buffers
    */
   public BlockDataStreamOutput(
       BlockID blockID,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
-      BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token
   ) throws IOException {
@@ -163,11 +145,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
         (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
     // Alternatively, stream setup can be delayed till the first chunk write.
     this.out = setupStream();
-    this.bufferPool = bufferPool;
     this.token = token;
 
-    //number of buffers used before doing a flush
-    refreshCurrentBuffer(bufferPool);
     flushPeriod = (int) (config.getStreamBufferFlushSize() / config
         .getStreamBufferSize());
 
@@ -178,8 +157,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
-    bufferList = null;
+    commitWatcher = new StreamCommitWatcher(xceiverClient);
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
     failedServers = new ArrayList<>(0);
@@ -209,20 +187,10 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
         .stream(message.getContent().asReadOnlyByteBuffer());
   }
 
-  private void refreshCurrentBuffer(BufferPool pool) {
-    currentBuffer = pool.getCurrentBuffer();
-    currentBufferRemaining =
-        currentBuffer != null ? currentBuffer.remaining() : 0;
-  }
-
   public BlockID getBlockID() {
     return blockID.get();
   }
 
-  public long getTotalAckDataLength() {
-    return commitWatcher.getTotalAckDataLength();
-  }
-
   public long getWrittenDataLength() {
     return writtenDataLength;
   }
@@ -236,82 +204,29 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     return xceiverClient;
   }
 
-  @VisibleForTesting
-  public long getTotalDataFlushedLength() {
-    return totalDataFlushedLength;
-  }
-
-  @VisibleForTesting
-  public BufferPool getBufferPool() {
-    return bufferPool;
-  }
-
   public IOException getIoException() {
     return ioException.get();
   }
 
-  @VisibleForTesting
-  public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
-    return commitWatcher.getCommitIndex2flushedDataMap();
-  }
-
   @Override
-  public void write(ByteBuf b) throws IOException {
+  public void write(ByteBuf buf) throws IOException {
     checkOpen();
-    if (b == null) {
+    if (buf == null) {
       throw new NullPointerException();
     }
-    int off = b.readerIndex();
-    int len = b.readableBytes();
-
-    while (len > 0) {
-      allocateNewBufferIfNeeded();
-      final int writeLen = Math.min(currentBufferRemaining, len);
-      // TODO: avoid buffer copy here
-      currentBuffer.put(b.nioBuffer(off, writeLen));
-      currentBufferRemaining -= writeLen;
-      writeChunkIfNeeded();
-      off += writeLen;
-      len -= writeLen;
-      writtenDataLength += writeLen;
-      doFlushOrWatchIfNeeded();
-    }
-  }
-
-  private void writeChunkIfNeeded() throws IOException {
-    if (currentBufferRemaining == 0) {
-      writeChunk(currentBuffer);
-    }
-  }
-
-  private void doFlushOrWatchIfNeeded() throws IOException {
-    if (currentBufferRemaining == 0) {
-      if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
-        updateFlushLength();
-        executePutBlock(false, false);
-      }
-      // Data in the bufferPool can not exceed streamBufferMaxSize
-      if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
-        handleFullBuffer();
-      }
+    final int len = buf.readableBytes();
+    if (len == 0) {
+      return;
     }
-  }
+    writeChunkToContainer(buf);
 
-  private void allocateNewBufferIfNeeded() {
-    if (currentBufferRemaining == 0) {
-      currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
-      currentBufferRemaining = currentBuffer.remaining();
-    }
+    writtenDataLength += len;
   }
 
   private void updateFlushLength() {
     totalDataFlushedLength = writtenDataLength;
   }
 
-  private boolean isBufferPoolFull() {
-    return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
-  }
-
   /**
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
@@ -319,70 +234,9 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * @throws IOException if error occurred
    */
 
-  // In this case, the data is already cached in the currentBuffer.
+  // TODO: We need add new retry policy without depend on bufferPool.
   public void writeOnRetry(long len) throws IOException {
-    if (len == 0) {
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Retrying write length {} for blockID {}", len, blockID);
-    }
-    Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
-    int count = 0;
-    while (len > 0) {
-      ChunkBuffer buffer = bufferPool.getBuffer(count);
-      long writeLen = Math.min(buffer.position(), len);
-      if (!buffer.hasRemaining()) {
-        writeChunk(buffer);
-      }
-      len -= writeLen;
-      count++;
-      writtenDataLength += writeLen;
-      // we should not call isBufferFull/shouldFlush here.
-      // The buffer might already be full as whole data is already cached in
-      // the buffer. We should just validate
-      // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
-      // call for handling full buffer/flush buffer condition.
-      if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
-        // reset the position to zero as now we will be reading the
-        // next buffer in the list
-        updateFlushLength();
-        executePutBlock(false, false);
-      }
-      if (writtenDataLength == config.getStreamBufferMaxSize()) {
-        handleFullBuffer();
-      }
-    }
-  }
 
-  /**
-   * This is a blocking call. It will wait for the flush till the commit index
-   * at the head of the commitIndex2flushedDataMap gets replicated to all or
-   * majority.
-   * @throws IOException
-   */
-  private void handleFullBuffer() throws IOException {
-    try {
-      checkOpen();
-      if (!commitWatcher.getFutureMap().isEmpty()) {
-        waitOnFlushFutures();
-      }
-    } catch (ExecutionException e) {
-      handleExecutionException(e);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      handleInterruptedException(ex, true);
-    }
-    watchForCommit(true);
-  }
-
-
-  // It may happen that once the exception is encountered , we still might
-  // have successfully flushed up to a certain index. Make sure the buffers
-  // only contain data which have not been sufficiently replicated
-  private void adjustBuffersOnException() {
-    commitWatcher.releaseBuffersOnException();
-    refreshCurrentBuffer(bufferPool);
   }
 
   /**
@@ -397,7 +251,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     checkOpen();
     try {
       XceiverClientReply reply = bufferFull ?
-          commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
+          commitWatcher.streamWatchOnFirstIndex() :
+          commitWatcher.streamWatchOnLastIndex();
       if (reply != null) {
         List<DatanodeDetails> dnList = reply.getDatanodes();
         if (!dnList.isEmpty()) {
@@ -412,7 +267,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
       setIoException(ioe);
       throw getIoException();
     }
-    refreshCurrentBuffer(bufferPool);
 
   }
 
@@ -426,22 +280,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
       boolean force) throws IOException {
     checkOpen();
     long flushPos = totalDataFlushedLength;
-    final List<ChunkBuffer> byteBufferList;
-    if (!force) {
-      Preconditions.checkNotNull(bufferList);
-      byteBufferList = bufferList;
-      bufferList = null;
-      Preconditions.checkNotNull(byteBufferList);
-    } else {
-      byteBufferList = null;
-    }
-
-    try {
-      CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
-    } catch (Exception e) {
-      LOG.warn("Failed to write all chunks through stream: " + e);
-      throw new IOException(e);
-    }
+    flush();
     if (close) {
       dataStreamCloseReply = out.closeAsync();
     }
@@ -471,15 +310,12 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 "Adding index " + asyncReply.getLogIndex() + " commitMap size "
-                    + commitWatcher.getCommitInfoMapSize() + " flushLength "
-                    + flushPos + " numBuffers " + byteBufferList.size()
-                    + " blockID " + blockID + " bufferPool size" + bufferPool
-                    .getSize() + " currentBufferIndex " + bufferPool
-                    .getCurrentBufferIndex());
+                    + commitWatcher.getCommitInfoSetSize() + " flushLength "
+                    + flushPos + " blockID " + blockID);
           }
           // for standalone protocol, logIndex will always be 0.
-          commitWatcher
-              .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
+          commitWatcher.updateCommitInfoSet(
+              asyncReply.getLogIndex());
         }
         return e;
       }, responseExecutor).exceptionally(e -> {
@@ -503,36 +339,12 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
   @Override
   public void flush() throws IOException {
-    if (xceiverClientFactory != null && xceiverClient != null
-        && bufferPool != null && bufferPool.getSize() > 0
-        && (!config.isStreamBufferFlushDelay() ||
-            writtenDataLength - totalDataFlushedLength
-                >= config.getStreamBufferSize())) {
-      try {
-        handleFlush(false);
-      } catch (ExecutionException e) {
-        // just set the exception here as well in order to maintain sanctity of
-        // ioException field
-        handleExecutionException(e);
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        handleInterruptedException(ex, true);
-      }
-    }
-  }
-
-  private void writeChunk(ChunkBuffer buffer)
-      throws IOException {
-    // This data in the buffer will be pushed to datanode and a reference will
-    // be added to the bufferList. Once putBlock gets executed, this list will
-    // be marked null. Hence, during first writeChunk call after every putBlock
-    // call or during the first call to writeChunk here, the list will be null.
-
-    if (bufferList == null) {
-      bufferList = new ArrayList<>();
+    try {
+      CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
+    } catch (Exception e) {
+      LOG.warn("Failed to write all chunks through stream: " + e);
+      throw new IOException(e);
     }
-    bufferList.add(buffer);
-    writeChunkToContainer(buffer.duplicate(0, buffer.position()));
   }
 
   /**
@@ -543,11 +355,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     checkOpen();
     // flush the last chunk data residing on the currentBuffer
     if (totalDataFlushedLength < writtenDataLength) {
-      refreshCurrentBuffer(bufferPool);
-      Preconditions.checkArgument(currentBuffer.position() > 0);
-      if (currentBuffer.hasRemaining()) {
-        writeChunk(currentBuffer);
-      }
       // This can be a partially filled chunk. Since we are flushing the buffer
       // here, we just limit this buffer to the current position. So that next
       // write will happen in new buffer
@@ -570,8 +377,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
   @Override
   public void close() throws IOException {
-    if (xceiverClientFactory != null && xceiverClient != null
-        && bufferPool != null && bufferPool.getSize() > 0) {
+    if (xceiverClientFactory != null && xceiverClient != null) {
       try {
         handleFlush(true);
         dataStreamCloseReply.get();
@@ -583,10 +389,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
       } finally {
         cleanup(false);
       }
-      // TODO: Turn the below buffer empty check on when Standalone pipeline
-      // is removed in the write path in tests
-      // Preconditions.checkArgument(buffer.position() == 0);
-      // bufferPool.checkBufferPoolEmpty();
 
     }
   }
@@ -638,10 +440,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     xceiverClientFactory = null;
     xceiverClient = null;
     commitWatcher.cleanup();
-    if (bufferList !=  null) {
-      bufferList.clear();
-    }
-    bufferList = null;
     responseExecutor.shutdown();
   }
 
@@ -655,7 +453,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     if (isClosed()) {
       throw new IOException("BlockDataStreamOutput has been closed.");
     } else if (getIoException() != null) {
-      adjustBuffersOnException();
       throw getIoException();
     }
   }
@@ -683,12 +480,11 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * @throws OzoneChecksumException if there is an error while computing
    * checksum
    */
-  private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
-    int effectiveChunkSize = chunk.remaining();
+  private void writeChunkToContainer(ByteBuf buf)
+      throws IOException {
+    ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer());
+    int effectiveChunkSize = buf.readableBytes();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
-    final ByteString data = chunk.toByteString(
-        bufferPool.byteStringConversion());
-    ChecksumData checksumData = checksum.computeChecksum(chunk);
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
         .setOffset(offset)
@@ -703,21 +499,22 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
     CompletableFuture<DataStreamReply> future =
         (needSync(offset + effectiveChunkSize) ?
-        out.writeAsync(data.asReadOnlyByteBuffer(), StandardWriteOption.SYNC) :
-        out.writeAsync(data.asReadOnlyByteBuffer()))
-        .whenCompleteAsync((r, e) -> {
-          if (e != null || !r.isSuccess()) {
-            if (e == null) {
-              e = new IOException("result is not success");
-            }
-            String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
-                " " + "into block " + blockID;
-            LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
-            CompletionException ce = new CompletionException(msg, e);
-            setIoException(ce);
-            throw ce;
-          }
-        }, responseExecutor);
+            out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) :
+            out.writeAsync(buf.nioBuffer()))
+            .whenCompleteAsync((r, e) -> {
+              if (e != null || !r.isSuccess()) {
+                if (e == null) {
+                  e = new IOException("result is not success");
+                }
+                String msg =
+                    "Failed to write chunk " + chunkInfo.getChunkName() +
+                        " " + "into block " + blockID;
+                LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+                CompletionException ce = new CompletionException(msg, e);
+                setIoException(ce);
+                throw ce;
+              }
+            }, responseExecutor);
 
     futures.add(future);
     containerBlockData.addChunks(chunkInfo);
@@ -754,7 +551,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    */
   private void handleExecutionException(Exception ex) throws IOException {
     setIoException(ex);
-    adjustBuffersOnException();
     throw getIoException();
   }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
new file mode 100644
index 0000000000..c187ffe902
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * This class maintains the map of the commitIndexes to be watched for
+ * successful replication in the datanodes in a given pipeline. It also releases
+ * the buffers associated with the user data back to {@Link BufferPool} once
+ * minimum replication criteria is achieved during an ozone key write.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class executes watchForCommit on ratis pipeline and releases
+ * buffers once data successfully gets replicated.
+ */
+public class StreamCommitWatcher {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamCommitWatcher.class);
+
+  private Set<Long> commitIndexSet;
+
+  // future Map to hold up all putBlock futures
+  private ConcurrentHashMap<Long,
+      CompletableFuture<ContainerCommandResponseProto>>
+      futureMap;
+
+  private XceiverClientSpi xceiverClient;
+
+  public StreamCommitWatcher(XceiverClientSpi xceiverClient) {
+    this.xceiverClient = xceiverClient;
+    commitIndexSet = new ConcurrentSkipListSet();
+    futureMap = new ConcurrentHashMap<>();
+  }
+
+  public void updateCommitInfoSet(long index) {
+    commitIndexSet.add(index);
+  }
+
+  int getCommitInfoSetSize() {
+    return commitIndexSet.size();
+  }
+
+  /**
+   * Calls watch for commit for the first index in commitIndex2flushedDataMap to
+   * the Ratis client.
+   * @return {@link XceiverClientReply} reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
+    if (!commitIndexSet.isEmpty()) {
+      // wait for the  first commit index in the commitIndex2flushedDataMap
+      // to get committed to all or majority of nodes in case timeout
+      // happens.
+      long index =
+          commitIndexSet.stream().mapToLong(v -> v).min()
+              .getAsLong();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("waiting for first index {} to catch up", index);
+      }
+      return streamWatchForCommit(index);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Calls watch for commit for the last index in commitIndex2flushedDataMap to
+   * the Ratis client.
+   * @return {@link XceiverClientReply} reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  public XceiverClientReply streamWatchOnLastIndex()
+      throws IOException {
+    if (!commitIndexSet.isEmpty()) {
+      // wait for the  commit index in the commitIndex2flushedDataMap
+      // to get committed to all or majority of nodes in case timeout
+      // happens.
+      long index =
+          commitIndexSet.stream().mapToLong(v -> v).max()
+              .getAsLong();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("waiting for last flush Index {} to catch up", index);
+      }
+      return streamWatchForCommit(index);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * calls watchForCommit API of the Ratis Client. This method is for streaming
+   * and no longer requires releaseBuffers
+   * @param commitIndex log index to watch for
+   * @return minimum commit index replicated to all nodes
+   * @throws IOException IOException in case watch gets timed out
+   */
+  public XceiverClientReply streamWatchForCommit(long commitIndex)
+      throws IOException {
+    try {
+      XceiverClientReply reply =
+          xceiverClient.watchForCommit(commitIndex);
+      return reply;
+    } catch (InterruptedException e) {
+      // Re-interrupt the thread while catching InterruptedException
+      Thread.currentThread().interrupt();
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
+    } catch (TimeoutException | ExecutionException e) {
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
+    }
+  }
+
+  private IOException getIOExceptionForWatchForCommit(long commitIndex,
+                                                       Exception e) {
+    LOG.warn("watchForCommit failed for index {}", commitIndex, e);
+    IOException ioException = new IOException(
+        "Unexpected Storage Container Exception: " + e.toString(), e);
+    return ioException;
+  }
+
+  public ConcurrentMap<Long,
+        CompletableFuture<
+            ContainerCommandResponseProto>> getFutureMap() {
+    return futureMap;
+  }
+
+  public void cleanup() {
+    if (commitIndexSet != null) {
+      commitIndexSet.clear();
+    }
+    if (futureMap != null) {
+      futureMap.clear();
+    }
+    commitIndexSet = null;
+  }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index 6954742601..98907bf8af 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -52,15 +51,12 @@ public final class BlockDataStreamOutputEntry
   private long currentPosition;
   private final Token<OzoneBlockTokenIdentifier> token;
 
-  private BufferPool bufferPool;
-
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   private BlockDataStreamOutputEntry(
       BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
       long length,
-      BufferPool bufferPool,
       Token<OzoneBlockTokenIdentifier> token,
       OzoneClientConfig config
   ) {
@@ -73,7 +69,6 @@ public final class BlockDataStreamOutputEntry
     this.token = token;
     this.length = length;
     this.currentPosition = 0;
-    this.bufferPool = bufferPool;
   }
 
   long getLength() {
@@ -98,7 +93,7 @@ public final class BlockDataStreamOutputEntry
     if (this.byteBufStreamOutput == null) {
       this.byteBufStreamOutput =
           new BlockDataStreamOutput(blockID, xceiverClientManager,
-              pipeline, bufferPool, config, token);
+              pipeline, config, token);
     }
   }
 
@@ -135,20 +130,6 @@ public final class BlockDataStreamOutputEntry
     return false;
   }
 
-  long getTotalAckDataLength() {
-    if (byteBufStreamOutput != null) {
-      BlockDataStreamOutput out =
-          (BlockDataStreamOutput) this.byteBufStreamOutput;
-      blockID = out.getBlockID();
-      return out.getTotalAckDataLength();
-    } else {
-      // For a pre allocated block for which no write has been initiated,
-      // the ByteBufStreamOutput will be null here.
-      // In such cases, the default blockCommitSequenceId will be 0
-      return 0;
-    }
-  }
-
   Collection<DatanodeDetails> getFailedServers() {
     if (byteBufStreamOutput != null) {
       BlockDataStreamOutput out =
@@ -198,7 +179,6 @@ public final class BlockDataStreamOutputEntry
     private XceiverClientFactory xceiverClientManager;
     private Pipeline pipeline;
     private long length;
-    private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
 
@@ -230,12 +210,6 @@ public final class BlockDataStreamOutputEntry
       return this;
     }
 
-
-    public Builder setBufferPool(BufferPool pool) {
-      this.bufferPool = pool;
-      return this;
-    }
-
     public Builder setConfig(OzoneClientConfig clientConfig) {
       this.config = clientConfig;
       return this;
@@ -252,7 +226,6 @@ public final class BlockDataStreamOutputEntry
           xceiverClientManager,
           pipeline,
           length,
-          bufferPool,
           token, config);
     }
   }
@@ -282,10 +255,6 @@ public final class BlockDataStreamOutputEntry
     return currentPosition;
   }
 
-  public BufferPool getBufferPool() {
-    return bufferPool;
-  }
-
   public void setCurrentPosition(long curPosition) {
     this.currentPosition = curPosition;
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 94c505f2af..4bc55de262 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -22,12 +22,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -58,7 +56,6 @@ public class BlockDataStreamOutputEntryPool {
   private final OmKeyArgs keyArgs;
   private final XceiverClientFactory xceiverClientFactory;
   private final String requestID;
-  private final BufferPool bufferPool;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private final long openID;
   private final ExcludeList excludeList;
@@ -86,13 +83,6 @@ public class BlockDataStreamOutputEntryPool {
     this.requestID = requestId;
     this.openID = openID;
     this.excludeList = new ExcludeList();
-
-    this.bufferPool =
-        new BufferPool(config.getStreamBufferSize(),
-            (int) (config.getStreamBufferMaxSize() / config
-                .getStreamBufferSize()),
-            ByteStringConversion
-                .createByteBufferConversion(unsafeByteBufferConversion));
   }
 
   /**
@@ -114,8 +104,6 @@ public class BlockDataStreamOutputEntryPool {
     config.setStreamBufferFlushDelay(false);
     requestID = null;
     int chunkSize = 0;
-    bufferPool = new BufferPool(chunkSize, 1);
-
     currentStreamIndex = 0;
     openID = -1;
     excludeList = new ExcludeList();
@@ -154,7 +142,6 @@ public class BlockDataStreamOutputEntryPool {
             .setPipeline(subKeyInfo.getPipeline())
             .setConfig(config)
             .setLength(subKeyInfo.getLength())
-            .setBufferPool(bufferPool)
             .setToken(subKeyInfo.getToken());
     streamEntries.add(builder.build());
   }
@@ -293,17 +280,10 @@ public class BlockDataStreamOutputEntryPool {
     return streamEntries.get(currentStreamIndex);
   }
 
-  long computeBufferData() {
-    return bufferPool.computeBufferData();
-  }
-
   void cleanup() {
     if (excludeList != null) {
       excludeList.clear();
     }
-    if (bufferPool != null) {
-      bufferPool.clearBufferPool();
-    }
 
     if (streamEntries != null) {
       streamEntries.clear();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index a9be11667c..c37f9cd51d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -279,27 +279,7 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
     }
     Pipeline pipeline = streamEntry.getPipeline();
     PipelineID pipelineId = pipeline.getId();
-    long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
-    //set the correct length for the current stream
-    streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
-    long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();
-    if (containerExclusionException) {
-      LOG.debug(
-          "Encountered exception {}. The last committed block length is {}, "
-              + "uncommitted data length is {} retry count {}", exception,
-          totalSuccessfulFlushedData, bufferedDataLen, retryCount);
-    } else {
-      LOG.warn(
-          "Encountered exception {} on the pipeline {}. "
-              + "The last committed block length is {}, "
-              + "uncommitted data length is {} retry count {}", exception,
-          pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
-    }
-    Preconditions.checkArgument(
-        bufferedDataLen <= config.getStreamBufferMaxSize());
-    Preconditions.checkArgument(
-        offset - blockDataStreamOutputEntryPool.getKeyLength() ==
-        bufferedDataLen);
+
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
@@ -337,13 +317,6 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
       blockDataStreamOutputEntryPool
           .discardPreallocatedBlocks(-1, pipelineId);
     }
-    if (bufferedDataLen > 0) {
-      // If the data is still cached in the underlying stream, we need to
-      // allocate new block and write this data in the datanode.
-      handleRetry(exception, bufferedDataLen);
-      // reset the retryCount after handling the exception
-      retryCount = 0;
-    }
   }
 
   private void markStreamClosed() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org