You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/11 14:02:00 UTC

[hadoop] branch trunk updated: HDDS-1348. Refactor BlockOutpuStream Class. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0468c5  HDDS-1348. Refactor BlockOutpuStream Class. Contributed by Shashikant Banerjee.
a0468c5 is described below

commit a0468c5756054a0ebe83d256efaea20e3003cd92
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Thu Apr 11 19:31:26 2019 +0530

    HDDS-1348. Refactor BlockOutpuStream Class. Contributed by Shashikant Banerjee.
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 135 ++--------
 .../hadoop/hdds/scm/storage/CommitWatcher.java     | 237 +++++++++++++++++
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java | 296 +++++++++++++++++++++
 3 files changed, 558 insertions(+), 110 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 13c4a0c..139f494 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -48,16 +48,12 @@ import java.util.UUID;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
     .putBlockAsync;
@@ -97,7 +93,6 @@ public class BlockOutputStream extends OutputStream {
   private int chunkSize;
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
-  private final long watchTimeout;
   private BufferPool bufferPool;
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
@@ -111,10 +106,6 @@ public class BlockOutputStream extends OutputStream {
   // effective data write attempted so far for the block
   private long writtenDataLength;
 
-  // total data which has been successfully flushed and acknowledged
-  // by all servers
-  private long totalAckDataLength;
-
   // List containing buffers for which the putBlock call will
   // update the length in the datanodes. This list will just maintain
   // references to the buffers in the BufferPool which will be cleared
@@ -123,17 +114,10 @@ public class BlockOutputStream extends OutputStream {
   // which got written between successive putBlock calls.
   private List<ByteBuffer> bufferList;
 
-  // future Map to hold up all putBlock futures
-  private ConcurrentHashMap<Long,
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
-      futureMap;
-
-  // The map should maintain the keys (logIndexes) in order so that while
-  // removing we always end up updating incremented data flushed length.
+  // This object will maintain the commitIndexes and byteBufferList in order
   // Also, corresponding to the logIndex, the corresponding list of buffers will
   // be released from the buffer pool.
-  private ConcurrentSkipListMap<Long, List<ByteBuffer>>
-      commitIndex2flushedDataMap;
+  private final CommitWatcher commitWatcher;
 
   private List<DatanodeDetails> failedServers;
 
@@ -175,20 +159,17 @@ public class BlockOutputStream extends OutputStream {
     this.chunkIndex = 0;
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
-    this.watchTimeout = watchTimeout;
     this.bufferPool = bufferPool;
     this.checksumType = checksumType;
     this.bytesPerChecksum = bytesPerChecksum;
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
-    totalAckDataLength = 0;
-    futureMap = new ConcurrentHashMap<>();
+    commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout);
+    bufferList = null;
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
     failedServers = Collections.emptyList();
-    bufferList = null;
     ioException = new AtomicReference<>(null);
   }
 
@@ -198,7 +179,7 @@ public class BlockOutputStream extends OutputStream {
   }
 
   public long getTotalAckDataLength() {
-    return totalAckDataLength;
+    return commitWatcher.getTotalAckDataLength();
   }
 
   public long getWrittenDataLength() {
@@ -230,7 +211,7 @@ public class BlockOutputStream extends OutputStream {
 
   @VisibleForTesting
   public Map<Long, List<ByteBuffer>> getCommitIndex2flushedDataMap() {
-    return commitIndex2flushedDataMap;
+    return commitWatcher.getCommitIndex2flushedDataMap();
   }
 
   @Override
@@ -334,34 +315,6 @@ public class BlockOutputStream extends OutputStream {
   }
 
   /**
-   * just update the totalAckDataLength. In case of failure,
-   * we will read the data starting from totalAckDataLength.
-   */
-  private void updateFlushIndex(List<Long> indexes) {
-    Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
-    for (long index : indexes) {
-      Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
-      List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
-      long length = buffers.stream().mapToLong(value -> {
-        int pos = value.position();
-        Preconditions.checkArgument(pos <= chunkSize);
-        return pos;
-      }).sum();
-      // totalAckDataLength replicated yet should always be incremented
-      // with the current length being returned from commitIndex2flushedDataMap.
-      totalAckDataLength += length;
-      LOG.debug("Total data successfully replicated: " + totalAckDataLength);
-      futureMap.remove(totalAckDataLength);
-      // Flush has been committed to required servers successful.
-      // just release the current buffer from the buffer pool corresponding
-      // to the buffers that have been committed with the putBlock call.
-      for (ByteBuffer byteBuffer : buffers) {
-        bufferPool.releaseBuffer(byteBuffer);
-      }
-    }
-  }
-
-  /**
    * This is a blocking call. It will wait for the flush till the commit index
    * at the head of the commitIndex2flushedDataMap gets replicated to all or
    * majority.
@@ -370,7 +323,7 @@ public class BlockOutputStream extends OutputStream {
   private void handleFullBuffer() throws IOException {
     try {
       checkOpen();
-      if (!futureMap.isEmpty()) {
+      if (!commitWatcher.getFutureMap().isEmpty()) {
         waitOnFlushFutures();
       }
     } catch (InterruptedException | ExecutionException e) {
@@ -378,47 +331,31 @@ public class BlockOutputStream extends OutputStream {
       adjustBuffersOnException();
       throw getIoException();
     }
-    if (!commitIndex2flushedDataMap.isEmpty()) {
-      watchForCommit(
-          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
-              .min().getAsLong());
-    }
+    watchForCommit(true);
   }
 
-  private void adjustBuffers(long commitIndex) {
-    List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
-        .filter(p -> p <= commitIndex).collect(Collectors.toList());
-    if (keyList.isEmpty()) {
-      return;
-    } else {
-      updateFlushIndex(keyList);
-    }
-  }
 
   // It may happen that once the exception is encountered , we still might
   // have successfully flushed up to a certain index. Make sure the buffers
   // only contain data which have not been sufficiently replicated
   private void adjustBuffersOnException() {
-    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+    commitWatcher.releaseBuffersOnException();
   }
 
   /**
    * calls watchForCommit API of the Ratis Client. For Standalone client,
    * it is a no op.
-   * @param commitIndex log index to watch for
+   * @param bufferFull flag indicating whether bufferFull condition is hit or
+   *              its called as part flush/close
    * @return minimum commit index replicated to all nodes
    * @throws IOException IOException in case watch gets timed out
    */
-  private void watchForCommit(long commitIndex) throws IOException {
+  private void watchForCommit(boolean bufferFull) throws IOException {
     checkOpen();
-    Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
-    long index;
     try {
-      XceiverClientReply reply =
-          xceiverClient.watchForCommit(commitIndex, watchTimeout);
-      if (reply == null) {
-        index = 0;
-      } else {
+      XceiverClientReply reply = bufferFull ?
+          commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
+      if (reply != null) {
         List<DatanodeDetails> dnList = reply.getDatanodes();
         if (!dnList.isEmpty()) {
           if (failedServers.isEmpty()) {
@@ -426,13 +363,9 @@ public class BlockOutputStream extends OutputStream {
           }
           failedServers.addAll(dnList);
         }
-        index = reply.getLogIndex();
       }
-      adjustBuffers(index);
-    } catch (TimeoutException | InterruptedException | ExecutionException e) {
-      LOG.warn("watchForCommit failed for index " + commitIndex, e);
-      setIoException(e);
-      adjustBuffersOnException();
+    } catch (IOException ioe) {
+      setIoException(ioe);
       throw getIoException();
     }
   }
@@ -471,14 +404,14 @@ public class BlockOutputStream extends OutputStream {
           blockID = responseBlockID;
           LOG.debug(
               "Adding index " + asyncReply.getLogIndex() + " commitMap size "
-                  + commitIndex2flushedDataMap.size() + " flushLength "
+                  + commitWatcher.getCommitInfoMapSize() + " flushLength "
                   + flushPos + " numBuffers " + byteBufferList.size()
                   + " blockID " + blockID + " bufferPool size" + bufferPool
                   .getSize() + " currentBufferIndex " + bufferPool
                   .getCurrentBufferIndex());
           // for standalone protocol, logIndex will always be 0.
-          commitIndex2flushedDataMap
-              .put(asyncReply.getLogIndex(), byteBufferList);
+          commitWatcher
+              .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
         }
         return e;
       }, responseExecutor).exceptionally(e -> {
@@ -493,7 +426,7 @@ public class BlockOutputStream extends OutputStream {
       throw new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
     }
-    futureMap.put(flushPos, flushFuture);
+    commitWatcher.getFutureMap().put(flushPos, flushFuture);
     return flushFuture;
   }
 
@@ -553,18 +486,7 @@ public class BlockOutputStream extends OutputStream {
       executePutBlock();
     }
     waitOnFlushFutures();
-    if (!commitIndex2flushedDataMap.isEmpty()) {
-      // wait for the last commit index in the commitIndex2flushedDataMap
-      // to get committed to all or majority of nodes in case timeout
-      // happens.
-      long lastIndex =
-          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
-              .max().getAsLong();
-      LOG.debug(
-          "waiting for last flush Index " + lastIndex + " to catch up");
-      watchForCommit(lastIndex);
-    }
-
+    watchForCommit(false);
     // just check again if the exception is hit while waiting for the
     // futures to ensure flush has indeed succeeded
 
@@ -594,11 +516,11 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
-
   private void waitOnFlushFutures()
       throws InterruptedException, ExecutionException {
     CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
-        futureMap.values().toArray(new CompletableFuture[futureMap.size()]));
+        commitWatcher.getFutureMap().values().toArray(
+            new CompletableFuture[commitWatcher.getFutureMap().size()]));
     // wait for all the transactions to complete
     combinedFuture.get();
   }
@@ -637,18 +559,11 @@ public class BlockOutputStream extends OutputStream {
     }
     xceiverClientManager = null;
     xceiverClient = null;
-    if (futureMap != null) {
-      futureMap.clear();
-    }
-    futureMap = null;
+    commitWatcher.cleanup();
     if (bufferList !=  null) {
       bufferList.clear();
     }
     bufferList = null;
-    if (commitIndex2flushedDataMap != null) {
-      commitIndex2flushedDataMap.clear();
-    }
-    commitIndex2flushedDataMap = null;
     responseExecutor.shutdown();
   }
 
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
new file mode 100644
index 0000000..aeac941
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * This class maintains the map of the commitIndexes to be watched for
+ * successful replication in the datanodes in a given pipeline. It also releases
+ * the buffers associated with the user data back to {@Link BufferPool} once
+ * minimum replication criteria is achieved during an ozone key write.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutionException;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class executes watchForCommit on ratis pipeline and releases
+ * buffers once data successfully gets replicated.
+ */
+public class CommitWatcher {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CommitWatcher.class);
+
+  // A reference to the pool of buffers holding the data
+  private BufferPool bufferPool;
+
+  // The map should maintain the keys (logIndexes) in order so that while
+  // removing we always end up updating incremented data flushed length.
+  // Also, corresponding to the logIndex, the corresponding list of buffers will
+  // be released from the buffer pool.
+  private ConcurrentSkipListMap<Long, List<ByteBuffer>>
+      commitIndex2flushedDataMap;
+
+  // future Map to hold up all putBlock futures
+  private ConcurrentHashMap<Long,
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+      futureMap;
+
+  private XceiverClientSpi xceiverClient;
+
+  private final long watchTimeout;
+
+  // total data which has been successfully flushed and acknowledged
+  // by all servers
+  private long totalAckDataLength;
+
+  public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient,
+      long watchTimeout) {
+    this.bufferPool = bufferPool;
+    this.xceiverClient = xceiverClient;
+    this.watchTimeout = watchTimeout;
+    commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
+    totalAckDataLength = 0;
+    futureMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * just update the totalAckDataLength. In case of failure,
+   * we will read the data starting from totalAckDataLength.
+   */
+  private long releaseBuffers(List<Long> indexes) {
+    Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
+    for (long index : indexes) {
+      Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
+      List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
+      long length = buffers.stream().mapToLong(value -> {
+        int pos = value.position();
+        return pos;
+      }).sum();
+      totalAckDataLength += length;
+      // clear the future object from the future Map
+      Preconditions.checkNotNull(futureMap.remove(totalAckDataLength));
+      for (ByteBuffer byteBuffer : buffers) {
+        bufferPool.releaseBuffer(byteBuffer);
+      }
+    }
+    return totalAckDataLength;
+  }
+
+  public void updateCommitInfoMap(long index, List<ByteBuffer> byteBufferList) {
+    commitIndex2flushedDataMap
+        .put(index, byteBufferList);
+  }
+
+  int getCommitInfoMapSize() {
+    return commitIndex2flushedDataMap.size();
+  }
+
+  /**
+   * Calls watch for commit for the first index in commitIndex2flushedDataMap to
+   * the Ratis client.
+   * @return reply reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  public XceiverClientReply watchOnFirstIndex() throws IOException {
+    if (!commitIndex2flushedDataMap.isEmpty()) {
+      // wait for the  first commit index in the commitIndex2flushedDataMap
+      // to get committed to all or majority of nodes in case timeout
+      // happens.
+      long index =
+          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).min()
+              .getAsLong();
+      LOG.debug("waiting for first index " + index + " to catch up");
+      return watchForCommit(index);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Calls watch for commit for the first index in commitIndex2flushedDataMap to
+   * the Ratis client.
+   * @return reply reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  public XceiverClientReply watchOnLastIndex()
+      throws IOException {
+    if (!commitIndex2flushedDataMap.isEmpty()) {
+      // wait for the  commit index in the commitIndex2flushedDataMap
+      // to get committed to all or majority of nodes in case timeout
+      // happens.
+      long index =
+          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).max()
+              .getAsLong();
+      LOG.debug("waiting for last flush Index " + index + " to catch up");
+      return watchForCommit(index);
+    } else {
+      return null;
+    }
+  }
+
+
+  private void adjustBuffers(long commitIndex) {
+    List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
+        .filter(p -> p <= commitIndex).collect(Collectors.toList());
+    if (keyList.isEmpty()) {
+      return;
+    } else {
+      releaseBuffers(keyList);
+    }
+  }
+
+  // It may happen that once the exception is encountered , we still might
+  // have successfully flushed up to a certain index. Make sure the buffers
+  // only contain data which have not been sufficiently replicated
+  void releaseBuffersOnException() {
+    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+  }
+
+
+  /**
+   * calls watchForCommit API of the Ratis Client. For Standalone client,
+   * it is a no op.
+   * @param commitIndex log index to watch for
+   * @return minimum commit index replicated to all nodes
+   * @throws IOException IOException in case watch gets timed out
+   */
+  public XceiverClientReply watchForCommit(long commitIndex)
+      throws IOException {
+    Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
+    long index;
+    try {
+      XceiverClientReply reply =
+          xceiverClient.watchForCommit(commitIndex, watchTimeout);
+      if (reply == null) {
+        index = 0;
+      } else {
+        index = reply.getLogIndex();
+      }
+      adjustBuffers(index);
+      return reply;
+    } catch (TimeoutException | InterruptedException | ExecutionException e) {
+      LOG.warn("watchForCommit failed for index " + commitIndex, e);
+      IOException ioException = new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
+      releaseBuffersOnException();
+      throw ioException;
+    }
+  }
+
+  @VisibleForTesting
+  public ConcurrentSkipListMap<Long,
+      List<ByteBuffer>> getCommitIndex2flushedDataMap() {
+    return commitIndex2flushedDataMap;
+  }
+
+  public ConcurrentHashMap<Long,
+      CompletableFuture<ContainerProtos.
+          ContainerCommandResponseProto>> getFutureMap() {
+    return futureMap;
+  }
+
+  public long getTotalAckDataLength() {
+    return totalAckDataLength;
+  }
+
+  public void cleanup() {
+    if (commitIndex2flushedDataMap != null) {
+      commitIndex2flushedDataMap.clear();
+    }
+    if (futureMap != null) {
+      futureMap.clear();
+    }
+    commitIndex2flushedDataMap = null;
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
new file mode 100644
index 0000000..ea51900
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.CommitWatcher;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Class to test CommitWatcher functionality.
+ */
+public class TestCommitWatcher {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf = new OzoneConfiguration();
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static long flushSize;
+  private static long maxFlushSize;
+  private static long blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static String containerOwner = "OZONE";
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    chunkSize = (int)(1 * OzoneConsts.MB);
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+        StorageUnit.MB);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "testblockoutputstream";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+    storageContainerLocationClient = cluster
+        .getStorageContainerLocationClient();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testReleaseBuffers() throws Exception {
+    int capacity = 2;
+    BufferPool bufferPool = new BufferPool(chunkSize, capacity);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, containerOwner);
+    Pipeline pipeline = container.getPipeline();
+    long containerId = container.getContainerInfo().getContainerID();
+    XceiverClientSpi xceiverClient = clientManager.acquireClient(pipeline);
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
+    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
+    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
+    BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
+    List<ByteBuffer> bufferList = new ArrayList<>();
+    List<XceiverClientReply> replies = new ArrayList<>();
+    long length = 0;
+    List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+        futures = new ArrayList<>();
+    for (int i = 0; i < capacity; i++) {
+      bufferList.clear();
+      ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+          ContainerTestHelper
+              .getWriteChunkRequest(pipeline, blockID, chunkSize);
+      // add the data to the buffer pool
+      ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put(
+          writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer());
+      ratisClient.sendCommandAsync(writeChunkRequest);
+      ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+          ContainerTestHelper
+              .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
+      XceiverClientReply reply = ratisClient.sendCommandAsync(putBlockRequest);
+      bufferList.add(byteBuffer);
+      length += byteBuffer.position();
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          reply.getResponse().thenApply(v -> {
+            watcher.updateCommitInfoMap(reply.getLogIndex(), bufferList);
+            return v;
+          });
+      futures.add(future);
+      watcher.getFutureMap().put(length, future);
+      replies.add(reply);
+    }
+
+    Assert.assertTrue(replies.size() == 2);
+    // wait on the 1st putBlock to complete
+    CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future1 =
+        futures.get(0);
+    CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future2 =
+        futures.get(1);
+    future1.get();
+    Assert.assertNotNull(watcher.getFutureMap().get(new Long(chunkSize)));
+    Assert.assertTrue(
+        watcher.getFutureMap().get(new Long(chunkSize)).equals(future1));
+    // wait on 2nd putBlock to complete
+    future2.get();
+    Assert.assertNotNull(watcher.getFutureMap().get(new Long(2 * chunkSize)));
+    Assert.assertTrue(
+        watcher.getFutureMap().get(new Long(2 * chunkSize)).equals(future2));
+    Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 2);
+    watcher.watchOnFirstIndex();
+    Assert.assertFalse(watcher.getCommitIndex2flushedDataMap()
+        .containsKey(replies.get(0).getLogIndex()));
+    Assert.assertFalse(watcher.getFutureMap().containsKey(chunkSize));
+    Assert.assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
+    watcher.watchOnLastIndex();
+    Assert.assertFalse(watcher.getCommitIndex2flushedDataMap()
+        .containsKey(replies.get(1).getLogIndex()));
+    Assert.assertFalse(watcher.getFutureMap().containsKey(2 * chunkSize));
+    Assert.assertTrue(watcher.getTotalAckDataLength() == 2 * chunkSize);
+    Assert.assertTrue(watcher.getFutureMap().isEmpty());
+    Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+  }
+
+  @Test
+  public void testReleaseBuffersOnException() throws Exception {
+    int capacity = 2;
+    BufferPool bufferPool = new BufferPool(chunkSize, capacity);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, containerOwner);
+    Pipeline pipeline = container.getPipeline();
+    long containerId = container.getContainerInfo().getContainerID();
+    XceiverClientSpi xceiverClient = clientManager.acquireClient(pipeline);
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
+    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
+    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
+    BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
+    List<ByteBuffer> bufferList = new ArrayList<>();
+    List<XceiverClientReply> replies = new ArrayList<>();
+    long length = 0;
+    List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+        futures = new ArrayList<>();
+    for (int i = 0; i < capacity; i++) {
+      bufferList.clear();
+      ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+          ContainerTestHelper
+              .getWriteChunkRequest(pipeline, blockID, chunkSize);
+      // add the data to the buffer pool
+      ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put(
+          writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer());
+      ratisClient.sendCommandAsync(writeChunkRequest);
+      ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+          ContainerTestHelper
+              .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
+      XceiverClientReply reply = ratisClient.sendCommandAsync(putBlockRequest);
+      bufferList.add(byteBuffer);
+      length += byteBuffer.position();
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          reply.getResponse().thenApply(v -> {
+            watcher.updateCommitInfoMap(reply.getLogIndex(), bufferList);
+            return v;
+          });
+      futures.add(future);
+      watcher.getFutureMap().put(length, future);
+      replies.add(reply);
+    }
+
+    Assert.assertTrue(replies.size() == 2);
+    // wait on the 1st putBlock to complete
+    CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future1 =
+        futures.get(0);
+    CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future2 =
+        futures.get(1);
+    future1.get();
+    Assert.assertNotNull(watcher.getFutureMap().get(new Long(chunkSize)));
+    Assert.assertTrue(
+        watcher.getFutureMap().get(new Long(chunkSize)).equals(future1));
+    // wait on 2nd putBlock to complete
+    future2.get();
+    Assert.assertNotNull(watcher.getFutureMap().get(new Long(2 * chunkSize)));
+    Assert.assertTrue(
+        watcher.getFutureMap().get(new Long(2 * chunkSize)).equals(future2));
+    Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 2);
+    watcher.watchOnFirstIndex();
+    Assert.assertFalse(watcher.getCommitIndex2flushedDataMap()
+        .containsKey(replies.get(0).getLogIndex()));
+    Assert.assertFalse(watcher.getFutureMap().containsKey(chunkSize));
+    Assert.assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+    try {
+      // just watch for a higher index so as to ensure, it does an actual
+      // call to Ratis. Otherwise, it may just return in case the commitInfoMap
+      // is updated to the latest index in putBlock response.
+      watcher.watchForCommit(replies.get(1).getLogIndex() + 1);
+    } catch(IOException ioe) {
+      Assert.assertTrue(ioe.getCause() instanceof TimeoutException);
+    }
+    long lastIndex = replies.get(1).getLogIndex();
+    // Depending on the last successfully replicated commitIndex, either we
+    // discard only 1st buffer or both buffers
+    Assert.assertTrue(ratisClient.getReplicatedMinCommitIndex() <= lastIndex);
+    if (ratisClient.getReplicatedMinCommitIndex() < replies.get(1)
+        .getLogIndex()) {
+      Assert.assertTrue(watcher.getTotalAckDataLength() == chunkSize);
+      Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 1);
+      Assert.assertTrue(watcher.getFutureMap().size() == 1);
+    } else {
+      Assert.assertTrue(watcher.getTotalAckDataLength() == 2 * chunkSize);
+      Assert.assertTrue(watcher.getFutureMap().isEmpty());
+      Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org