You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/11/28 21:13:52 UTC
[ozone] branch HDDS-6517-Snapshot updated: HDDS-7279. Snapshot Create requires Double Buffer Flush thread to split the commit batch. (#3958)
This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
new 6fe2ab8011 HDDS-7279. Snapshot Create requires Double Buffer Flush thread to split the commit batch. (#3958)
6fe2ab8011 is described below
commit 6fe2ab80115ae4d7c2b705886895575c5722f66d
Author: Hemant Kumar <he...@gmail.com>
AuthorDate: Mon Nov 28 13:13:47 2022 -0800
HDDS-7279. Snapshot Create requires Double Buffer Flush thread to split the commit batch. (#3958)
---
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 431 ++++++++++++---------
.../om/ratis/TestOzoneManagerDoubleBuffer.java | 216 +++++++++++
2 files changed, 471 insertions(+), 176 deletions(-)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 905f0c768d..2fedea5d54 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.ozone.om.ratis;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -31,38 +35,33 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.function.SupplierWithIOException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
-import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.util.Time;
import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
/**
* This class implements DoubleBuffer implementation of OMClientResponse's. In
* DoubleBuffer it has 2 buffers one is currentBuffer and other is
- * readyBuffer. The current OM requests will be always added to currentBuffer.
- * Flush thread will be running in background, it check's if currentBuffer has
+ * readyBuffer. The current OM requests will always be added to currentBuffer.
+ * Flush thread will be running in background, it checks if currentBuffer has
* any entries, it swaps the buffer and creates a batch and commit to DB.
* Adding OM request to doubleBuffer and swap of buffer are synchronized
* methods.
@@ -111,8 +110,6 @@ public final class OzoneManagerDoubleBuffer {
*/
private Function<Long, Long> indexToTerm;
-
-
/**
* Builder for creating OzoneManagerDoubleBuffer.
*/
@@ -176,15 +173,11 @@ public final class OzoneManagerDoubleBuffer {
Function<Long, Long> indexToTerm, int maxUnFlushedTransactions) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
-
this.isRatisEnabled = isRatisEnabled;
this.isTracingEnabled = isTracingEnabled;
if (!isRatisEnabled) {
this.currentFutureQueue = new ConcurrentLinkedQueue<>();
this.readyFutureQueue = new ConcurrentLinkedQueue<>();
- } else {
- this.currentFutureQueue = null;
- this.readyFutureQueue = null;
}
this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
this.omMetadataManager = omMetadataManager;
@@ -194,11 +187,10 @@ public final class OzoneManagerDoubleBuffer {
this.indexToTerm = indexToTerm;
isRunning.set(true);
- // Daemon thread which runs in back ground and flushes transactions to DB.
+ // Daemon thread which runs in background and flushes transactions to DB.
daemon = new Daemon(this::flushTransactions);
daemon.setName("OMDoubleBufferFlushThread");
daemon.start();
-
}
/**
@@ -210,8 +202,8 @@ public final class OzoneManagerDoubleBuffer {
}
/**
- *Releases the given number of permits,
- *returning them to the unFlushedTransactions.
+ * Releases the given number of permits,
+ * returning them to the unFlushedTransactions.
*/
public void releaseUnFlushedTransactions(int n) {
unFlushedTransactions.release(n);
@@ -220,7 +212,7 @@ public final class OzoneManagerDoubleBuffer {
// TODO: pass the trace id further down and trace all methods of DBStore.
/**
- * add to write batch with trace span if tracing is enabled.
+ * Add to write batch with trace span if tracing is enabled.
*/
private Void addToBatchWithTrace(OMResponse omResponse,
SupplierWithIOException<Void> supplier) throws IOException {
@@ -234,7 +226,7 @@ public final class OzoneManagerDoubleBuffer {
}
/**
- * flush write batch with trace span if tracing is enabled.
+ * Flush write batch with trace span if tracing is enabled.
*/
private Void flushBatchWithTrace(String parentName, int batchSize,
SupplierWithIOException<Void> supplier) throws IOException {
@@ -263,138 +255,204 @@ public final class OzoneManagerDoubleBuffer {
* and commit to DB.
*/
private void flushTransactions() {
- while (isRunning.get()) {
- try {
- if (canFlush()) {
- Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
- setReadyBuffer();
- List<Long> flushedEpochs = null;
- try (BatchOperation batchOperation = omMetadataManager.getStore()
- .initBatchOperation()) {
-
- AtomicReference<String> lastTraceId = new AtomicReference<>();
- readyBuffer.iterator().forEachRemaining((entry) -> {
- try {
- OMResponse omResponse = entry.getResponse().getOMResponse();
- lastTraceId.set(omResponse.getTraceID());
- addToBatchWithTrace(omResponse,
- (SupplierWithIOException<Void>) () -> {
- entry.getResponse().checkAndUpdateDB(omMetadataManager,
- batchOperation);
- return null;
- });
-
- addCleanupEntry(entry, cleanupEpochs);
-
- } catch (IOException ex) {
- // During Adding to RocksDB batch entry got an exception.
- // We should terminate the OM.
- terminate(ex);
- }
- });
+ while (isRunning.get() && canFlush()) {
+ flushCurrentBuffer();
+ }
+ }
- // Commit transaction info to DB.
- flushedEpochs = readyBuffer.stream().map(
- DoubleBufferEntry::getTrxLogIndex)
- .sorted().collect(Collectors.toList());
- long lastRatisTransactionIndex = flushedEpochs.get(
- flushedEpochs.size() - 1);
- long term = isRatisEnabled ?
- indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
- addToBatchTransactionInfoWithTrace(lastTraceId.get(),
- lastRatisTransactionIndex,
- (SupplierWithIOException<Void>) () -> {
- omMetadataManager.getTransactionInfoTable().putWithBatch(
- batchOperation, TRANSACTION_INFO_KEY,
- new TransactionInfo.Builder()
- .setTransactionIndex(lastRatisTransactionIndex)
- .setCurrentTerm(term).build());
- return null;
- });
-
- long startTime = Time.monotonicNow();
- flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
- () -> {
- omMetadataManager.getStore().commitBatchOperation(
- batchOperation);
- return null;
- });
- ozoneManagerDoubleBufferMetrics.updateFlushTime(
- Time.monotonicNow() - startTime);
- }
-
- // Complete futures first and then do other things. So, that
- // handler threads will be released.
- if (!isRatisEnabled) {
- // Once all entries are flushed, we can complete their future.
- readyFutureQueue.iterator().forEachRemaining((entry) -> {
- entry.complete(null);
- });
+ /**
+ * This is to extract out the flushing logic to make it testable.
+ * If we don't do that, there could be a race condition which could fail
+ * the unit test on different machines.
+ */
+ @VisibleForTesting
+ void flushCurrentBuffer() {
+ try {
+ swapCurrentAndReadyBuffer();
+
+ // For snapshot, we want to include all the keys that were committed
+ // before the snapshot `create` command was executed. To achieve
+ // the behaviour, we spilt the request buffer at snapshot create
+ // request and flush the buffer in batches split at snapshot create
+ // request.
+ // For example, if requestBuffer is [request1, request2,
+ // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+ //
+ // Split requestBuffer would be.
+ // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+ // [snapshotRequest2], [request4]].
+ // And bufferQueues will be flushed in following order:
+ // Flush #1: [request1, request2]
+ // Flush #2: [snapshotRequest1]
+ // Flush #3: [request3]
+ // Flush #4: [snapshotRequest2]
+ // Flush #5: [request4]
+ List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+ splitReadyBufferAtCreateSnapshot();
+
+ for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+ flushBatch(buffer);
+ }
+
+ clearReadyBuffer();
+ } catch (IOException ex) {
+ terminate(ex);
+ } catch (Throwable t) {
+ final String s = "OMDoubleBuffer flush thread " +
+ Thread.currentThread().getName() + " encountered Throwable error";
+ ExitUtils.terminate(2, s, t, LOG);
+ }
+ }
- readyFutureQueue.clear();
- }
-
- int flushedTransactionsSize = readyBuffer.size();
- flushedTransactionCount.addAndGet(flushedTransactionsSize);
- flushIterations.incrementAndGet();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sync Iteration {} flushed transactions in this " +
- "iteration {}", flushIterations.get(),
- flushedTransactionsSize);
- }
-
- // When non-HA do the sort step here, as the sorted list is not
- // required for flush to DB. As in non-HA we want to complete
- // futures as quick as possible after flush to DB, to release rpc
- // handler threads.
- if (!isRatisEnabled) {
- flushedEpochs =
- readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
- .sorted().collect(Collectors.toList());
- }
-
-
- // Clean up committed transactions.
-
- cleanupCache(cleanupEpochs);
-
- readyBuffer.clear();
-
- if (isRatisEnabled) {
- releaseUnFlushedTransactions(flushedTransactionsSize);
- }
-
- // update the last updated index in OzoneManagerStateMachine.
- ozoneManagerRatisSnapShot.updateLastAppliedIndex(
- flushedEpochs);
-
- // set metrics.
- updateMetrics(flushedTransactionsSize);
- }
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- if (isRunning.get()) {
- final String message = "OMDoubleBuffer flush thread " +
- Thread.currentThread().getName() + " encountered Interrupted " +
- "exception while running";
- ExitUtils.terminate(1, message, ex, LOG);
- } else {
- LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
- + "exit.", Thread.currentThread().getName());
- }
+ private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+ throws IOException {
+
+ Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+ List<Long> flushedEpochs;
+
+ try (BatchOperation batchOperation = omMetadataManager.getStore()
+ .initBatchOperation()) {
+
+ String lastTraceId = addToBatch(buffer, batchOperation);
+
+ buffer.iterator().forEachRemaining(
+ entry -> addCleanupEntry(entry, cleanupEpochs));
+
+ // Commit transaction info to DB.
+ flushedEpochs = buffer.stream()
+ .map(DoubleBufferEntry::getTrxLogIndex)
+ .sorted()
+ .collect(Collectors.toList());
+
+ long lastRatisTransactionIndex = flushedEpochs.get(
+ flushedEpochs.size() - 1);
+
+ long term = isRatisEnabled ?
+ indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+ addToBatchTransactionInfoWithTrace(lastTraceId,
+ lastRatisTransactionIndex,
+ () -> {
+ omMetadataManager.getTransactionInfoTable().putWithBatch(
+ batchOperation, TRANSACTION_INFO_KEY,
+ new TransactionInfo.Builder()
+ .setTransactionIndex(lastRatisTransactionIndex)
+ .setCurrentTerm(term)
+ .build());
+ return null;
+ });
+
+ long startTime = Time.monotonicNow();
+ flushBatchWithTrace(lastTraceId, buffer.size(),
+ () -> {
+ omMetadataManager.getStore()
+ .commitBatchOperation(batchOperation);
+ return null;
+ });
+
+ ozoneManagerDoubleBufferMetrics.updateFlushTime(
+ Time.monotonicNow() - startTime);
+ }
+
+ // Complete futures first and then do other things.
+ // So that handler threads will be released.
+ if (!isRatisEnabled) {
+ clearReadyFutureQueue(buffer.size());
+ }
+
+ int flushedTransactionsSize = buffer.size();
+ flushedTransactionCount.addAndGet(flushedTransactionsSize);
+ flushIterations.incrementAndGet();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+ flushIterations.get(),
+ flushedTransactionsSize);
+ }
+
+ // Clean up committed transactions.
+ cleanupCache(cleanupEpochs);
+
+ if (isRatisEnabled) {
+ releaseUnFlushedTransactions(flushedTransactionsSize);
+ }
+ // update the last updated index in OzoneManagerStateMachine.
+ ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+ // set metrics.
+ updateMetrics(flushedTransactionsSize);
+ }
+
+ private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+ BatchOperation batchOperation) {
+ String lastTraceId = null;
+ for (DoubleBufferEntry<OMClientResponse> entry: buffer) {
+ OMClientResponse response = entry.getResponse();
+ OMResponse omResponse = response.getOMResponse();
+ lastTraceId = omResponse.getTraceID();
+
+ try {
+ addToBatchWithTrace(omResponse,
+ () -> {
+ response.checkAndUpdateDB(omMetadataManager, batchOperation);
+ return null;
+ });
} catch (IOException ex) {
+ // During Adding to RocksDB batch entry got an exception.
+ // We should terminate the OM.
terminate(ex);
- } catch (Throwable t) {
- final String s = "OMDoubleBuffer flush thread " +
- Thread.currentThread().getName() + " encountered Throwable error";
- ExitUtils.terminate(2, s, t, LOG);
}
}
+
+ return lastTraceId;
}
+ /**
+ * Splits the readyBuffer around the create snapshot request.
+ * Returns, the list of queue split by create snapshot requests.
+ *
+ * CreateSnapshot is used as barrier because the checkpoint creation happens
+ * in RocksDB callback flush. If multiple operations are flushed in one
+ * specific batch, we are not sure at the flush of which specific operation
+ * the callback is coming.
+ * There could be a possibility of race condition that is exposed to rocksDB
+ * behaviour for the batch.
+ * Hence, we treat createSnapshot as separate batch flush.
+ *
+ * e.g. requestBuffer = [request1, request2, snapshotRequest1,
+ * request3, snapshotRequest2, request4]
+ * response = [[request1, request2], [snapshotRequest1], [request3],
+ * [snapshotRequest2], [request4]]
+ */
+ private List<Queue<DoubleBufferEntry<OMClientResponse>>>
+ splitReadyBufferAtCreateSnapshot() {
+ List<Queue<DoubleBufferEntry<OMClientResponse>>> response =
+ new ArrayList<>();
+
+ Iterator<DoubleBufferEntry<OMClientResponse>> iterator =
+ readyBuffer.iterator();
+
+ OMResponse previousOmResponse = null;
+ while (iterator.hasNext()) {
+ DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+ OMResponse omResponse = entry.getResponse().getOMResponse();
+ // New queue gets created in three conditions:
+ // 1. It is first element in the response,
+ // 2. Current request is createSnapshot request.
+ // 3. Previous request was createSnapshot request.
+ if (response.isEmpty() ||
+ omResponse.getCreateSnapshotResponse() != null ||
+ (previousOmResponse != null &&
+ previousOmResponse.getCreateSnapshotResponse() != null)) {
+ response.add(new LinkedList<>());
+ }
+
+ response.get(response.size() - 1).add(entry);
+ previousOmResponse = omResponse;
+ }
+
+ return response;
+ }
private void addCleanupEntry(DoubleBufferEntry entry, Map<String,
List<Long>> cleanupEpochs) {
@@ -417,14 +475,23 @@ public final class OzoneManagerDoubleBuffer {
.add(entry.getTrxLogIndex());
}
} else {
- // This is to catch early errors, when an new response class missed to
+ // This is to catch early errors, when a new response class missed to
// add CleanupTableInfo annotation.
throw new RuntimeException("CleanupTableInfo Annotation is missing " +
"for" + responseClass);
}
}
-
+ /**
+ * Completes futures for first count element form the readyFutureQueue
+ * so that handler thread can be released asap.
+ */
+ private void clearReadyFutureQueue(int count) {
+ while (!readyFutureQueue.isEmpty() && count > 0) {
+ readyFutureQueue.remove().complete(null);
+ count--;
+ }
+ }
private void cleanupCache(Map<String, List<Long>> cleanupEpochs) {
cleanupEpochs.forEach((tableName, epochs) -> {
@@ -433,12 +500,14 @@ public final class OzoneManagerDoubleBuffer {
});
}
+ private synchronized void clearReadyBuffer() {
+ readyBuffer.clear();
+ }
/**
* Update OzoneManagerDoubleBuffer metrics values.
* @param flushedTransactionsSize
*/
- private void updateMetrics(
- int flushedTransactionsSize) {
+ private void updateMetrics(int flushedTransactionsSize) {
ozoneManagerDoubleBufferMetrics.incrTotalNumOfFlushOperations();
ozoneManagerDoubleBufferMetrics.incrTotalSizeOfFlushedTransactions(
flushedTransactionsSize);
@@ -462,6 +531,12 @@ public final class OzoneManagerDoubleBuffer {
// as this a normal flow of a shutdown.
@SuppressWarnings("squid:S2142")
public void stop() {
+ stopDaemon();
+ ozoneManagerDoubleBufferMetrics.unRegister();
+ }
+
+ @VisibleForTesting
+ public void stopDaemon() {
if (isRunning.compareAndSet(true, false)) {
LOG.info("Stopping OMDoubleBuffer flush thread");
daemon.interrupt();
@@ -471,15 +546,10 @@ public final class OzoneManagerDoubleBuffer {
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for daemon to exit.", e);
}
-
- // stop metrics.
- ozoneManagerDoubleBufferMetrics.unRegister();
} else {
LOG.info("OMDoubleBuffer flush thread is not running.");
}
-
}
-
private void terminate(IOException ex) {
String message = "During flush to DB encountered error in " +
"OMDoubleBuffer flush thread " + Thread.currentThread().getName();
@@ -524,27 +594,37 @@ public final class OzoneManagerDoubleBuffer {
}
/**
- * Check can we flush transactions or not. This method wait's until
- * currentBuffer size is greater than zero, once currentBuffer size is
- * greater than zero it gets notify signal, and it returns true
- * indicating that we are ready to flush.
- *
- * @return boolean
+ * Check if transactions can be flushed or not. It waits till currentBuffer
+ * size is greater than zero. When any item gets added to currentBuffer,
+ * it gets notify signal. Returns true once currentBuffer size becomes greater
+ * than zero. In case of any interruption, terminates the OM when daemon is
+ * running otherwise returns false.
*/
- private synchronized boolean canFlush() throws InterruptedException {
- // When transactions are added to buffer it notifies, then we check if
- // currentBuffer size once and return from this method.
- while (currentBuffer.size() == 0) {
- wait(Long.MAX_VALUE);
+ private synchronized boolean canFlush() {
+ try {
+ while (currentBuffer.size() == 0) {
+ wait(Long.MAX_VALUE);
+ }
+ return true;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ if (isRunning.get()) {
+ final String message = "OMDoubleBuffer flush thread " +
+ Thread.currentThread().getName() + " encountered Interrupted " +
+ "exception while running";
+ ExitUtils.terminate(1, message, ex, LOG);
+ }
+ LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
+ + "exit.", Thread.currentThread().getName());
+ return false;
}
- return true;
}
/**
- * Prepares the readyBuffer which is used by sync thread to flush
- * transactions to OM DB. This method swaps the currentBuffer and readyBuffer.
+ * Swaps the currentBuffer with readyBuffer so that the readyBuffer can be
+ * used by sync thread to flush transactions to DB.
*/
- private synchronized void setReadyBuffer() {
+ private synchronized void swapCurrentAndReadyBuffer() {
Queue<DoubleBufferEntry<OMClientResponse>> temp = currentBuffer;
currentBuffer = readyBuffer;
readyBuffer = temp;
@@ -561,5 +641,4 @@ public final class OzoneManagerDoubleBuffer {
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
}
-
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
new file mode 100644
index 0000000000..cb977f46db
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.ozone.om.ratis;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.bucket.OMBucketCreateResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class tests snapshot aware OzoneManagerDoubleBuffer flushing logic.
+ */
+class TestOzoneManagerDoubleBuffer {
+
+ private OzoneManagerDoubleBuffer doubleBuffer;
+ private CreateSnapshotResponse snapshotResponse1 =
+ mock(CreateSnapshotResponse.class);
+ private CreateSnapshotResponse snapshotResponse2 =
+ mock(CreateSnapshotResponse.class);
+ private OMResponse omKeyResponse = mock(OMResponse.class);
+ private OMResponse omBucketResponse = mock(OMResponse.class);
+ private OMResponse omSnapshotResponse1 = mock(OMResponse.class);
+ private OMResponse omSnapshotResponse2 = mock(OMResponse.class);
+ private static OMClientResponse omKeyCreateResponse =
+ mock(OMKeyCreateResponse.class);
+ private static OMClientResponse omBucketCreateResponse =
+ mock(OMBucketCreateResponse.class);
+ private static OMClientResponse omSnapshotCreateResponse1 =
+ mock(OMBucketCreateResponse.class);
+ private static OMClientResponse omSnapshotCreateResponse2 =
+ mock(OMBucketCreateResponse.class);
+ @TempDir
+ private File tempDir;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ OMMetrics omMetrics = OMMetrics.create();
+ OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+ ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+ tempDir.getAbsolutePath());
+ OMMetadataManager omMetadataManager =
+ new OmMetadataManagerImpl(ozoneConfiguration);
+ OzoneManager ozoneManager = mock(OzoneManager.class);
+ when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+ when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+ when(ozoneManager.getMaxUserVolumeCount()).thenReturn(10L);
+ AuditLogger auditLogger = mock(AuditLogger.class);
+ when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+ Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+ OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
+ };
+
+ doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
+ .setOmMetadataManager(omMetadataManager)
+ .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+ .setmaxUnFlushedTransactionCount(1000)
+ .enableRatis(true)
+ .setIndexToTerm((i) -> 1L)
+ .build();
+
+ doNothing().when(omKeyCreateResponse).checkAndUpdateDB(any(), any());
+ doNothing().when(omBucketCreateResponse).checkAndUpdateDB(any(), any());
+ doNothing().when(omSnapshotCreateResponse1).checkAndUpdateDB(any(), any());
+ doNothing().when(omSnapshotCreateResponse2).checkAndUpdateDB(any(), any());
+
+ when(omKeyResponse.getTraceID()).thenReturn("keyTraceId");
+ when(omBucketResponse.getTraceID()).thenReturn("bucketTraceId");
+ when(omSnapshotResponse1.getTraceID()).thenReturn("snapshotTraceId-1");
+ when(omSnapshotResponse2.getTraceID()).thenReturn("snapshotTraceId-2");
+
+ when(omKeyResponse.getCreateSnapshotResponse()).thenReturn(null);
+ when(omBucketResponse.getCreateSnapshotResponse()).thenReturn(null);
+ when(omSnapshotResponse1.getCreateSnapshotResponse())
+ .thenReturn(snapshotResponse1);
+ when(omSnapshotResponse2.getCreateSnapshotResponse())
+ .thenReturn(snapshotResponse2);
+
+ when(omKeyCreateResponse.getOMResponse()).thenReturn(omKeyResponse);
+ when(omBucketCreateResponse.getOMResponse()).thenReturn(omBucketResponse);
+ when(omSnapshotCreateResponse1.getOMResponse())
+ .thenReturn(omSnapshotResponse1);
+ when(omSnapshotCreateResponse2.getOMResponse())
+ .thenReturn(omSnapshotResponse2);
+ }
+
+ @AfterEach
+ public void stop() {
+ if (doubleBuffer != null) {
+ doubleBuffer.stop();
+ }
+ }
+
+ private static Stream<Arguments> doubleBufferFlushCases() {
+ return Stream.of(
+ Arguments.of(Arrays.asList(omKeyCreateResponse,
+ omBucketCreateResponse),
+ 1L, 2L, 1L, 2L, 2L, 2.0F),
+ Arguments.of(Arrays.asList(omSnapshotCreateResponse1,
+ omSnapshotCreateResponse2),
+ 2L, 2L, 3L, 4L, 1L, 1.333F),
+ Arguments.of(Arrays.asList(omKeyCreateResponse,
+ omBucketCreateResponse,
+ omSnapshotCreateResponse1,
+ omSnapshotCreateResponse2),
+ 3L, 4L, 6L, 8L, 2L, 1.333F),
+ Arguments.of(Arrays.asList(omKeyCreateResponse,
+ omSnapshotCreateResponse1,
+ omBucketCreateResponse,
+ omSnapshotCreateResponse2),
+ 4L, 4L, 10L, 12L, 1L, 1.200F),
+ Arguments.of(Arrays.asList(omKeyCreateResponse,
+ omSnapshotCreateResponse1,
+ omSnapshotCreateResponse2,
+ omBucketCreateResponse),
+ 4L, 4L, 14L, 16L, 1L, 1.142F)
+ );
+ }
+
+ /**
+ * Tests OzoneManagerDoubleBuffer's snapshot aware splitting and flushing
+ * logic.
+ *
+ * @param expectedFlushCounts, Total flush count per OzoneManagerDoubleBuffer.
+ * @param expectedFlushedTransactionCount, Total transaction count per
+ * OzoneManagerDoubleBuffer.
+ * @param expectedFlushCountsInMetric, Overall flush count, and it is not
+ * same as expectedFlushCounts because
+ * metric static and shared object.
+ * @param expectedFlushedTransactionCountInMetric, Overall transaction count.
+ * @param expectedMaxNumberOfTransactionsFlushedInMetric, Overall max
+ * transaction count
+ * per flush.
+ * @param expectedAvgFlushTransactionsInMetric, Overall avg transaction count
+ * per flush.
+ */
+ @ParameterizedTest
+ @MethodSource("doubleBufferFlushCases")
+ public void testOzoneManagerDoubleBuffer(
+ List<OMClientResponse> omClientResponses,
+ long expectedFlushCounts,
+ long expectedFlushedTransactionCount,
+ long expectedFlushCountsInMetric,
+ long expectedFlushedTransactionCountInMetric,
+ long expectedMaxNumberOfTransactionsFlushedInMetric,
+ float expectedAvgFlushTransactionsInMetric
+ ) {
+
+ // Stop the daemon till to eliminate the race condition.
+ doubleBuffer.stopDaemon();
+
+ for (int i = 0; i < omClientResponses.size(); i++) {
+ doubleBuffer.add(omClientResponses.get(i), i);
+ }
+
+ // Flush the current buffer.
+ doubleBuffer.flushCurrentBuffer();
+
+ assertEquals(expectedFlushCounts, doubleBuffer.getFlushIterations());
+ assertEquals(expectedFlushedTransactionCount,
+ doubleBuffer.getFlushedTransactionCount());
+
+ OzoneManagerDoubleBufferMetrics bufferMetrics =
+ doubleBuffer.getOzoneManagerDoubleBufferMetrics();
+
+ assertEquals(expectedFlushCountsInMetric,
+ bufferMetrics.getTotalNumOfFlushOperations());
+ assertEquals(expectedFlushedTransactionCountInMetric,
+ bufferMetrics.getTotalNumOfFlushedTransactions());
+ assertEquals(expectedMaxNumberOfTransactionsFlushedInMetric,
+ bufferMetrics.getMaxNumberOfTransactionsFlushedInOneIteration());
+ assertEquals(expectedAvgFlushTransactionsInMetric,
+ bufferMetrics.getAvgFlushTransactionsInOneIteration(), 0.001);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org