You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/11/28 21:13:52 UTC

[ozone] branch HDDS-6517-Snapshot updated: HDDS-7279. Snapshot Create requires Double Buffer Flush thread to split the commit batch. (#3958)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
     new 6fe2ab8011 HDDS-7279. Snapshot Create requires Double Buffer Flush thread to split the commit batch. (#3958)
6fe2ab8011 is described below

commit 6fe2ab80115ae4d7c2b705886895575c5722f66d
Author: Hemant Kumar <he...@gmail.com>
AuthorDate: Mon Nov 28 13:13:47 2022 -0800

    HDDS-7279. Snapshot Create requires Double Buffer Flush thread to split the commit batch. (#3958)
---
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   | 431 ++++++++++++---------
 .../om/ratis/TestOzoneManagerDoubleBuffer.java     | 216 +++++++++++
 2 files changed, 471 insertions(+), 176 deletions(-)

diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 905f0c768d..2fedea5d54 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.ozone.om.ratis;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -31,38 +35,33 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.function.SupplierWithIOException;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
-import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
 import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
 import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 
 /**
  * This class implements DoubleBuffer implementation of OMClientResponse's. In
  * DoubleBuffer it has 2 buffers one is currentBuffer and other is
- * readyBuffer. The current OM requests will be always added to currentBuffer.
- * Flush thread will be running in background, it check's if currentBuffer has
+ * readyBuffer. The current OM requests will always be added to currentBuffer.
+ * Flush thread will be running in background, it checks if currentBuffer has
  * any entries, it swaps the buffer and creates a batch and commit to DB.
  * Adding OM request to doubleBuffer and swap of buffer are synchronized
  * methods.
@@ -111,8 +110,6 @@ public final class OzoneManagerDoubleBuffer {
    */
   private Function<Long, Long> indexToTerm;
 
-
-
   /**
    *  Builder for creating OzoneManagerDoubleBuffer.
    */
@@ -176,15 +173,11 @@ public final class OzoneManagerDoubleBuffer {
       Function<Long, Long> indexToTerm, int maxUnFlushedTransactions) {
     this.currentBuffer = new ConcurrentLinkedQueue<>();
     this.readyBuffer = new ConcurrentLinkedQueue<>();
-
     this.isRatisEnabled = isRatisEnabled;
     this.isTracingEnabled = isTracingEnabled;
     if (!isRatisEnabled) {
       this.currentFutureQueue = new ConcurrentLinkedQueue<>();
       this.readyFutureQueue = new ConcurrentLinkedQueue<>();
-    } else {
-      this.currentFutureQueue = null;
-      this.readyFutureQueue = null;
     }
     this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
     this.omMetadataManager = omMetadataManager;
@@ -194,11 +187,10 @@ public final class OzoneManagerDoubleBuffer {
     this.indexToTerm = indexToTerm;
 
     isRunning.set(true);
-    // Daemon thread which runs in back ground and flushes transactions to DB.
+    // Daemon thread which runs in background and flushes transactions to DB.
     daemon = new Daemon(this::flushTransactions);
     daemon.setName("OMDoubleBufferFlushThread");
     daemon.start();
-
   }
 
   /**
@@ -210,8 +202,8 @@ public final class OzoneManagerDoubleBuffer {
   }
 
   /**
-   *Releases the given number of permits,
-   *returning them to the unFlushedTransactions.
+   * Releases the given number of permits,
+   * returning them to the unFlushedTransactions.
    */
   public void releaseUnFlushedTransactions(int n) {
     unFlushedTransactions.release(n);
@@ -220,7 +212,7 @@ public final class OzoneManagerDoubleBuffer {
   // TODO: pass the trace id further down and trace all methods of DBStore.
 
   /**
-   * add to write batch with trace span if tracing is enabled.
+   * Add to write batch with trace span if tracing is enabled.
    */
   private Void addToBatchWithTrace(OMResponse omResponse,
       SupplierWithIOException<Void> supplier) throws IOException {
@@ -234,7 +226,7 @@ public final class OzoneManagerDoubleBuffer {
   }
 
   /**
-   * flush write batch with trace span if tracing is enabled.
+   * Flush write batch with trace span if tracing is enabled.
    */
   private Void flushBatchWithTrace(String parentName, int batchSize,
       SupplierWithIOException<Void> supplier) throws IOException {
@@ -263,138 +255,204 @@ public final class OzoneManagerDoubleBuffer {
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
+
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+          flushIterations.get(),
+          flushedTransactionsSize);
+    }
+
+    // Clean up committed transactions.
+    cleanupCache(cleanupEpochs);
+
+    if (isRatisEnabled) {
+      releaseUnFlushedTransactions(flushedTransactionsSize);
+    }
+    // update the last updated index in OzoneManagerStateMachine.
+    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+    // set metrics.
+    updateMetrics(flushedTransactionsSize);
+  }
+
+  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+                            BatchOperation batchOperation) {
+    String lastTraceId = null;
+    for (DoubleBufferEntry<OMClientResponse> entry: buffer) {
+      OMClientResponse response = entry.getResponse();
+      OMResponse omResponse = response.getOMResponse();
+      lastTraceId = omResponse.getTraceID();
+
+      try {
+        addToBatchWithTrace(omResponse,
+            () -> {
+              response.checkAndUpdateDB(omMetadataManager, batchOperation);
+              return null;
+            });
       } catch (IOException ex) {
+        // During Adding to RocksDB batch entry got an exception.
+        // We should terminate the OM.
         terminate(ex);
-      } catch (Throwable t) {
-        final String s = "OMDoubleBuffer flush thread " +
-            Thread.currentThread().getName() + " encountered Throwable error";
-        ExitUtils.terminate(2, s, t, LOG);
       }
     }
+
+    return lastTraceId;
   }
 
+  /**
+   * Splits the readyBuffer around the create snapshot request.
+   * Returns, the list of queue split by create snapshot requests.
+   *
+   * CreateSnapshot is used as barrier because the checkpoint creation happens
+   * in RocksDB callback flush. If multiple operations are flushed in one
+   * specific batch, we are not sure at the flush of which specific operation
+   * the callback is coming.
+   * There could be a possibility of race condition that is exposed to rocksDB
+   * behaviour for the batch.
+   * Hence, we treat createSnapshot as separate batch flush.
+   *
+   * e.g. requestBuffer = [request1, request2, snapshotRequest1,
+   * request3, snapshotRequest2, request4]
+   * response = [[request1, request2], [snapshotRequest1], [request3],
+   * [snapshotRequest2], [request4]]
+   */
+  private List<Queue<DoubleBufferEntry<OMClientResponse>>>
+      splitReadyBufferAtCreateSnapshot() {
+    List<Queue<DoubleBufferEntry<OMClientResponse>>> response =
+        new ArrayList<>();
+
+    Iterator<DoubleBufferEntry<OMClientResponse>> iterator =
+        readyBuffer.iterator();
+
+    OMResponse previousOmResponse = null;
+    while (iterator.hasNext()) {
+      DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+      OMResponse omResponse = entry.getResponse().getOMResponse();
+      // New queue gets created in three conditions:
+      // 1. It is first element in the response,
+      // 2. Current request is createSnapshot request.
+      // 3. Previous request was createSnapshot request.
+      if (response.isEmpty() ||
+          omResponse.getCreateSnapshotResponse() != null ||
+          (previousOmResponse != null &&
+              previousOmResponse.getCreateSnapshotResponse() != null)) {
+        response.add(new LinkedList<>());
+      }
+
+      response.get(response.size() - 1).add(entry);
+      previousOmResponse = omResponse;
+    }
+
+    return response;
+  }
 
   private void addCleanupEntry(DoubleBufferEntry entry, Map<String,
       List<Long>> cleanupEpochs) {
@@ -417,14 +475,23 @@ public final class OzoneManagerDoubleBuffer {
             .add(entry.getTrxLogIndex());
       }
     } else {
-      // This is to catch early errors, when an new response class missed to
+      // This is to catch early errors, when a new response class missed to
       // add CleanupTableInfo annotation.
       throw new RuntimeException("CleanupTableInfo Annotation is missing " +
           "for" + responseClass);
     }
   }
 
-
+  /**
+   * Completes futures for first count element form the readyFutureQueue
+   * so that handler thread can be released asap.
+   */
+  private void clearReadyFutureQueue(int count) {
+    while (!readyFutureQueue.isEmpty() && count > 0) {
+      readyFutureQueue.remove().complete(null);
+      count--;
+    }
+  }
 
   private void cleanupCache(Map<String, List<Long>> cleanupEpochs) {
     cleanupEpochs.forEach((tableName, epochs) -> {
@@ -433,12 +500,14 @@ public final class OzoneManagerDoubleBuffer {
     });
   }
 
+  private synchronized void clearReadyBuffer() {
+    readyBuffer.clear();
+  }
   /**
    * Update OzoneManagerDoubleBuffer metrics values.
    * @param flushedTransactionsSize
    */
-  private void updateMetrics(
-      int flushedTransactionsSize) {
+  private void updateMetrics(int flushedTransactionsSize) {
     ozoneManagerDoubleBufferMetrics.incrTotalNumOfFlushOperations();
     ozoneManagerDoubleBufferMetrics.incrTotalSizeOfFlushedTransactions(
         flushedTransactionsSize);
@@ -462,6 +531,12 @@ public final class OzoneManagerDoubleBuffer {
   // as this a normal flow of a shutdown.
   @SuppressWarnings("squid:S2142")
   public void stop() {
+    stopDaemon();
+    ozoneManagerDoubleBufferMetrics.unRegister();
+  }
+
+  @VisibleForTesting
+  public void stopDaemon() {
     if (isRunning.compareAndSet(true, false)) {
       LOG.info("Stopping OMDoubleBuffer flush thread");
       daemon.interrupt();
@@ -471,15 +546,10 @@ public final class OzoneManagerDoubleBuffer {
       } catch (InterruptedException e) {
         LOG.debug("Interrupted while waiting for daemon to exit.", e);
       }
-
-      // stop metrics.
-      ozoneManagerDoubleBufferMetrics.unRegister();
     } else {
       LOG.info("OMDoubleBuffer flush thread is not running.");
     }
-
   }
-
   private void terminate(IOException ex) {
     String message = "During flush to DB encountered error in " +
         "OMDoubleBuffer flush thread " + Thread.currentThread().getName();
@@ -524,27 +594,37 @@ public final class OzoneManagerDoubleBuffer {
   }
 
   /**
-   * Check can we flush transactions or not. This method wait's until
-   * currentBuffer size is greater than zero, once currentBuffer size is
-   * greater than zero it gets notify signal, and it returns true
-   * indicating that we are ready to flush.
-   *
-   * @return boolean
+   * Check if transactions can be flushed or not. It waits till currentBuffer
+   * size is greater than zero. When any item gets added to currentBuffer,
+   * it gets notify signal. Returns true once currentBuffer size becomes greater
+   * than zero. In case of any interruption, terminates the OM when daemon is
+   * running otherwise returns false.
    */
-  private synchronized boolean canFlush() throws InterruptedException {
-    // When transactions are added to buffer it notifies, then we check if
-    // currentBuffer size once and return from this method.
-    while (currentBuffer.size() == 0) {
-      wait(Long.MAX_VALUE);
+  private synchronized boolean canFlush() {
+    try {
+      while (currentBuffer.size() == 0) {
+        wait(Long.MAX_VALUE);
+      }
+      return true;
+    }  catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      if (isRunning.get()) {
+        final String message = "OMDoubleBuffer flush thread " +
+            Thread.currentThread().getName() + " encountered Interrupted " +
+            "exception while running";
+        ExitUtils.terminate(1, message, ex, LOG);
+      }
+      LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
+          + "exit.", Thread.currentThread().getName());
+      return false;
     }
-    return true;
   }
 
   /**
-   * Prepares the readyBuffer which is used by sync thread to flush
-   * transactions to OM DB. This method swaps the currentBuffer and readyBuffer.
+   * Swaps the currentBuffer with readyBuffer so that the readyBuffer can be
+   * used by sync thread to flush transactions to DB.
    */
-  private synchronized void setReadyBuffer() {
+  private synchronized void swapCurrentAndReadyBuffer() {
     Queue<DoubleBufferEntry<OMClientResponse>> temp = currentBuffer;
     currentBuffer = readyBuffer;
     readyBuffer = temp;
@@ -561,5 +641,4 @@ public final class OzoneManagerDoubleBuffer {
   public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
     return ozoneManagerDoubleBufferMetrics;
   }
-
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
new file mode 100644
index 0000000000..cb977f46db
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.ozone.om.ratis;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.bucket.OMBucketCreateResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class tests snapshot aware OzoneManagerDoubleBuffer flushing logic.
+ */
+class TestOzoneManagerDoubleBuffer {
+
+  private OzoneManagerDoubleBuffer doubleBuffer;
+  private CreateSnapshotResponse snapshotResponse1 =
+      mock(CreateSnapshotResponse.class);
+  private CreateSnapshotResponse snapshotResponse2 =
+      mock(CreateSnapshotResponse.class);
+  private OMResponse omKeyResponse = mock(OMResponse.class);
+  private OMResponse omBucketResponse = mock(OMResponse.class);
+  private OMResponse omSnapshotResponse1 = mock(OMResponse.class);
+  private OMResponse omSnapshotResponse2 = mock(OMResponse.class);
+  private static OMClientResponse omKeyCreateResponse =
+      mock(OMKeyCreateResponse.class);
+  private static OMClientResponse omBucketCreateResponse =
+      mock(OMBucketCreateResponse.class);
+  private static OMClientResponse omSnapshotCreateResponse1 =
+      mock(OMBucketCreateResponse.class);
+  private static OMClientResponse omSnapshotCreateResponse2 =
+      mock(OMBucketCreateResponse.class);
+  @TempDir
+  private File tempDir;
+
+  @BeforeEach
+  public void setup() throws IOException {
+    OMMetrics omMetrics = OMMetrics.create();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        tempDir.getAbsolutePath());
+    OMMetadataManager omMetadataManager =
+        new OmMetadataManagerImpl(ozoneConfiguration);
+    OzoneManager ozoneManager = mock(OzoneManager.class);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.getMaxUserVolumeCount()).thenReturn(10L);
+    AuditLogger auditLogger = mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+    OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
+    };
+
+    doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
+        .setOmMetadataManager(omMetadataManager)
+        .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+        .setmaxUnFlushedTransactionCount(1000)
+        .enableRatis(true)
+        .setIndexToTerm((i) -> 1L)
+        .build();
+
+    doNothing().when(omKeyCreateResponse).checkAndUpdateDB(any(), any());
+    doNothing().when(omBucketCreateResponse).checkAndUpdateDB(any(), any());
+    doNothing().when(omSnapshotCreateResponse1).checkAndUpdateDB(any(), any());
+    doNothing().when(omSnapshotCreateResponse2).checkAndUpdateDB(any(), any());
+
+    when(omKeyResponse.getTraceID()).thenReturn("keyTraceId");
+    when(omBucketResponse.getTraceID()).thenReturn("bucketTraceId");
+    when(omSnapshotResponse1.getTraceID()).thenReturn("snapshotTraceId-1");
+    when(omSnapshotResponse2.getTraceID()).thenReturn("snapshotTraceId-2");
+
+    when(omKeyResponse.getCreateSnapshotResponse()).thenReturn(null);
+    when(omBucketResponse.getCreateSnapshotResponse()).thenReturn(null);
+    when(omSnapshotResponse1.getCreateSnapshotResponse())
+        .thenReturn(snapshotResponse1);
+    when(omSnapshotResponse2.getCreateSnapshotResponse())
+        .thenReturn(snapshotResponse2);
+
+    when(omKeyCreateResponse.getOMResponse()).thenReturn(omKeyResponse);
+    when(omBucketCreateResponse.getOMResponse()).thenReturn(omBucketResponse);
+    when(omSnapshotCreateResponse1.getOMResponse())
+        .thenReturn(omSnapshotResponse1);
+    when(omSnapshotCreateResponse2.getOMResponse())
+        .thenReturn(omSnapshotResponse2);
+  }
+
+  @AfterEach
+  public void stop() {
+    if (doubleBuffer != null) {
+      doubleBuffer.stop();
+    }
+  }
+
+  private static Stream<Arguments> doubleBufferFlushCases() {
+    return Stream.of(
+        Arguments.of(Arrays.asList(omKeyCreateResponse,
+                omBucketCreateResponse),
+            1L, 2L, 1L, 2L, 2L, 2.0F),
+        Arguments.of(Arrays.asList(omSnapshotCreateResponse1,
+                omSnapshotCreateResponse2),
+            2L, 2L, 3L, 4L, 1L, 1.333F),
+        Arguments.of(Arrays.asList(omKeyCreateResponse,
+                omBucketCreateResponse,
+                omSnapshotCreateResponse1,
+                omSnapshotCreateResponse2),
+            3L, 4L, 6L, 8L, 2L, 1.333F),
+        Arguments.of(Arrays.asList(omKeyCreateResponse,
+                omSnapshotCreateResponse1,
+                omBucketCreateResponse,
+                omSnapshotCreateResponse2),
+            4L, 4L, 10L, 12L, 1L, 1.200F),
+        Arguments.of(Arrays.asList(omKeyCreateResponse,
+                omSnapshotCreateResponse1,
+                omSnapshotCreateResponse2,
+                omBucketCreateResponse),
+            4L, 4L, 14L, 16L, 1L, 1.142F)
+    );
+  }
+
+  /**
+   * Tests OzoneManagerDoubleBuffer's snapshot aware splitting and flushing
+   * logic.
+   *
+   * @param expectedFlushCounts, Total flush count per OzoneManagerDoubleBuffer.
+   * @param expectedFlushedTransactionCount, Total transaction count per
+   *                                         OzoneManagerDoubleBuffer.
+   * @param expectedFlushCountsInMetric, Overall flush count, and it is not
+   *                                     same as expectedFlushCounts because
+   *                                     metric static and shared object.
+   * @param expectedFlushedTransactionCountInMetric, Overall transaction count.
+   * @param expectedMaxNumberOfTransactionsFlushedInMetric, Overall max
+   *                                                        transaction count
+   *                                                        per flush.
+   * @param expectedAvgFlushTransactionsInMetric, Overall avg transaction count
+   *                                              per flush.
+   */
+  @ParameterizedTest
+  @MethodSource("doubleBufferFlushCases")
+  public void testOzoneManagerDoubleBuffer(
+      List<OMClientResponse> omClientResponses,
+      long expectedFlushCounts,
+      long expectedFlushedTransactionCount,
+      long expectedFlushCountsInMetric,
+      long expectedFlushedTransactionCountInMetric,
+      long expectedMaxNumberOfTransactionsFlushedInMetric,
+      float expectedAvgFlushTransactionsInMetric
+  ) {
+
+    // Stop the daemon till to eliminate the race condition.
+    doubleBuffer.stopDaemon();
+
+    for (int i = 0; i < omClientResponses.size(); i++) {
+      doubleBuffer.add(omClientResponses.get(i), i);
+    }
+
+    // Flush the current buffer.
+    doubleBuffer.flushCurrentBuffer();
+
+    assertEquals(expectedFlushCounts, doubleBuffer.getFlushIterations());
+    assertEquals(expectedFlushedTransactionCount,
+        doubleBuffer.getFlushedTransactionCount());
+
+    OzoneManagerDoubleBufferMetrics bufferMetrics =
+        doubleBuffer.getOzoneManagerDoubleBufferMetrics();
+
+    assertEquals(expectedFlushCountsInMetric,
+        bufferMetrics.getTotalNumOfFlushOperations());
+    assertEquals(expectedFlushedTransactionCountInMetric,
+        bufferMetrics.getTotalNumOfFlushedTransactions());
+    assertEquals(expectedMaxNumberOfTransactionsFlushedInMetric,
+        bufferMetrics.getMaxNumberOfTransactionsFlushedInOneIteration());
+    assertEquals(expectedAvgFlushTransactionsInMetric,
+        bufferMetrics.getAvgFlushTransactionsInOneIteration(), 0.001);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org