You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/11/20 08:52:08 UTC

[skywalking] branch master updated: Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage (#8161)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new a2897db  Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage (#8161)
a2897db is described below

commit a2897db4715092b8e4079fd57b462606deee2fdf
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sat Nov 20 16:51:54 2021 +0800

    Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage (#8161)
    
    The ES persistence execution is now asynchronous and the execution
    latency only counts the time to insert the requests into the bulk
    processor, instead of the time after the requests are flushed into the
    storage, this patch fixes that issue.
    
    There is the same issue in prepare latency but that needs more changes
    so I'll leave it to another pull request.
---
 CHANGES.md                                         |   1 +
 .../analysis/worker/MetricsPersistentWorker.java   |   4 +-
 .../oap/server/core/storage/IBatchDAO.java         |   3 +-
 .../oap/server/core/storage/PersistenceTimer.java  | 110 +++++++++++----------
 .../server/core/storage/PersistenceTimerTest.java  |   7 +-
 .../library/elasticsearch/bulk/BulkProcessor.java  |  36 ++++---
 .../elasticsearch/base/BatchProcessEsDAO.java      |  12 ++-
 .../storage/plugin/influxdb/base/BatchDAO.java     |  10 +-
 .../storage/plugin/jdbc/h2/dao/H2BatchDAO.java     |   6 +-
 9 files changed, 106 insertions(+), 83 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 86f2242..25f1ba1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -54,6 +54,7 @@ Release Notes.
 * Add filter mechanism in MAL core to filter metrics.
 * Fix concurrency bug in MAL `increase`-related calculation.
 * Fix a null pointer bug when building `SampleFamily`.
+* Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 07d14ef..9758cb3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -179,14 +179,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     @Override
     public List<PrepareRequest> buildBatchRequests() {
         if (persistentCounter++ % persistentMod != 0) {
-            return Collections.EMPTY_LIST;
+            return Collections.emptyList();
         }
 
         final List<Metrics> lastCollection = getCache().read();
 
         long start = System.currentTimeMillis();
         if (lastCollection.size() == 0) {
-            return Collections.EMPTY_LIST;
+            return Collections.emptyList();
         }
 
         /*
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
index ed3172f..f7279f27 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 
@@ -43,5 +44,5 @@ public interface IBatchDAO extends DAO {
      *
      * @param prepareRequests data to insert or update. No delete happens in streaming mode.
      */
-    void flush(List<PrepareRequest> prepareRequests);
+    CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 594cac1..3af6a78 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -21,12 +21,11 @@ package org.apache.skywalking.oap.server.core.storage;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker;
@@ -34,6 +33,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
@@ -56,21 +56,25 @@ public enum PersistenceTimer {
 
     public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
         log.info("persistence timer start");
-        IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
+        IBatchDAO batchDAO =
+            moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
 
         MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
                                                      .provider()
                                                      .getService(MetricsCreator.class);
         errorCounter = metricsCreator.createCounter(
-            "persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer",
+            "persistence_timer_bulk_error_count",
+            "Error execution of the prepare stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
         prepareLatency = metricsCreator.createHistogramMetric(
-            "persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer",
+            "persistence_timer_bulk_prepare_latency",
+            "Latency of the prepare stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
         executeLatency = metricsCreator.createHistogramMetric(
-            "persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
+            "persistence_timer_bulk_execute_latency",
+            "Latency of the execute stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
         allLatency = metricsCreator.createHistogramMetric(
@@ -82,69 +86,69 @@ public enum PersistenceTimer {
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor()
                      .scheduleWithFixedDelay(
-                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
-                             .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
-                         TimeUnit.SECONDS
+                         new RunnableWithExceptionProtection(
+                             () -> extractDataAndSave(batchDAO).join(),
+                             t -> log.error("Extract data and save failure.", t)
+                         ), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS
                      );
 
             this.isStarted = true;
         }
     }
 
-    private void extractDataAndSave(IBatchDAO batchDAO) {
+    private CompletableFuture<Void> extractDataAndSave(IBatchDAO batchDAO) {
         if (log.isDebugEnabled()) {
             log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
 
-        try (HistogramMetrics.Timer allTimer = allLatency.createTimer()) {
-            List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
-            persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
-            persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
-
-            CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size());
-            persistenceWorkers.forEach(worker -> {
-                prepareExecutorService.submit(() -> {
-                    List<PrepareRequest> innerPrepareRequests = null;
-                    try {
-                        // Prepare stage
-                        try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("extract {} worker data and save", worker.getClass().getName());
-                            }
-
-                            innerPrepareRequests = worker.buildBatchRequests();
-
-                            worker.endOfRound();
-                        } catch (Throwable e) {
-                            log.error(e.getMessage(), e);
+        HistogramMetrics.Timer allTimer = allLatency.createTimer();
+        List<PersistenceWorker<? extends StorageData>> workers = new ArrayList<>();
+        workers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+        workers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+
+        final CompletableFuture<Void> future =
+            CompletableFuture.allOf(workers.stream().map(worker -> {
+                return CompletableFuture.runAsync(() -> {
+                    List<PrepareRequest> innerPrepareRequests;
+                    // Prepare stage
+                    try (HistogramMetrics.Timer ignored = prepareLatency.createTimer()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(
+                                "extract {} worker data and save",
+                                worker.getClass().getName()
+                            );
                         }
 
-                        // Execution stage
-                        try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) {
-                            if (CollectionUtils.isNotEmpty(innerPrepareRequests)) {
-                                batchDAO.flush(innerPrepareRequests);
-                            }
-                        } catch (Throwable e) {
-                            log.error(e.getMessage(), e);
-                        }
-                    } finally {
-                        countDownLatch.countDown();
+                        innerPrepareRequests = worker.buildBatchRequests();
+
+                        worker.endOfRound();
+                    }
+
+                    if (CollectionUtils.isEmpty(innerPrepareRequests)) {
+                        return;
                     }
-                });
-            });
-
-            countDownLatch.await();
-        } catch (Throwable e) {
-            errorCounter.inc();
-            log.error(e.getMessage(), e);
-        } finally {
+
+                    // Execution stage
+                    HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
+                    batchDAO.flush(innerPrepareRequests)
+                            .whenComplete(($1, $2) -> executeLatencyTimer.close());
+                }, prepareExecutorService);
+            }).toArray(CompletableFuture[]::new));
+        future.whenComplete((unused, throwable) -> {
+            allTimer.close();
             if (log.isDebugEnabled()) {
-                log.debug("Persistence data save finish");
+                log.debug(
+                    "Batch persistence duration: {} ms",
+                    System.currentTimeMillis() - startTime
+                );
             }
-        }
-
-        log.debug("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+            if (throwable != null) {
+                errorCounter.inc();
+                log.error(throwable.getMessage(), throwable);
+            }
+        });
+        return future;
     }
 }
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
index bbe9ca6..f10f78c 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import lombok.Data;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker;
@@ -61,10 +62,11 @@ public class PersistenceTimerTest {
             }
 
             @Override
-            public void flush(final List<PrepareRequest> prepareRequests) {
+            public CompletableFuture<Void> flush(final List<PrepareRequest> prepareRequests) {
                 synchronized (result) {
                     result.addAll(prepareRequests);
                 }
+                return CompletableFuture.completedFuture(null);
             }
         };
         for (int i = 0; i < workCount; i++) {
@@ -79,7 +81,8 @@ public class PersistenceTimerTest {
         PersistenceTimer.INSTANCE.isStarted = true;
 
         PersistenceTimer.INSTANCE.start(moduleManager, moduleConfig);
-        Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO);
+        CompletableFuture<Void> f = Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO);
+        f.join();
 
         Assert.assertEquals(count * workCount * 2, result.size());
     }
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index 93c49ae..2733b13 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.ElasticSearch;
@@ -41,7 +42,7 @@ import static java.util.Objects.requireNonNull;
 
 @Slf4j
 public final class BulkProcessor {
-    private final ArrayBlockingQueue<Object> requests;
+    private final ArrayBlockingQueue<Holder> requests;
 
     private final AtomicReference<ElasticSearch> es;
     private final int bulkActions;
@@ -74,21 +75,21 @@ public final class BulkProcessor {
             this::flush, 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
     }
 
-    public BulkProcessor add(IndexRequest request) {
-        internalAdd(request);
-        return this;
+    public CompletableFuture<Void> add(IndexRequest request) {
+        return internalAdd(request);
     }
 
-    public BulkProcessor add(UpdateRequest request) {
-        internalAdd(request);
-        return this;
+    public CompletableFuture<Void> add(UpdateRequest request) {
+        return internalAdd(request);
     }
 
     @SneakyThrows
-    private void internalAdd(Object request) {
+    private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
-        requests.put(request);
+        final CompletableFuture<Void> f = new CompletableFuture<>();
+        requests.put(new Holder(f, request));
         flushIfNeeded();
+        return f;
     }
 
     @SneakyThrows
@@ -110,7 +111,7 @@ public final class BulkProcessor {
             return;
         }
 
-        final List<Object> batch = new ArrayList<>(requests.size());
+        final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
 
         final CompletableFuture<Void> flush = doFlush(batch);
@@ -118,7 +119,7 @@ public final class BulkProcessor {
         flush.join();
     }
 
-    private CompletableFuture<Void> doFlush(final List<Object> batch) {
+    private CompletableFuture<Void> doFlush(final List<Holder> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
 
         if (batch.isEmpty()) {
@@ -129,8 +130,8 @@ public final class BulkProcessor {
             try {
                 final RequestFactory rf = v.requestFactory();
                 final List<byte[]> bs = new ArrayList<>();
-                for (final Object request : batch) {
-                    bs.add(v.codec().encode(request));
+                for (final Holder holder : batch) {
+                    bs.add(v.codec().encode(holder.request));
                     bs.add("\n".getBytes());
                 }
                 final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
@@ -147,11 +148,20 @@ public final class BulkProcessor {
         });
         future.whenComplete((ignored, exception) -> {
             if (exception != null) {
+                batch.stream().map(it -> it.future)
+                     .forEach(it -> it.completeExceptionally(exception));
                 log.error("Failed to execute requests in bulk", exception);
             } else {
                 log.debug("Succeeded to execute {} requests in bulk", batch.size());
+                batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
             }
         });
         return future;
     }
+
+    @RequiredArgsConstructor
+    static class Holder {
+        private final CompletableFuture<Void> future;
+        private final Object request;
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 0cc371d..ef8ec00 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
@@ -56,19 +57,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
     }
 
     @Override
-    public void flush(List<PrepareRequest> prepareRequests) {
+    public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
         if (bulkProcessor == null) {
             this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
         }
 
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
+            return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
                 if (prepareRequest instanceof InsertRequest) {
-                    this.bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
                 } else {
-                    this.bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
                 }
-            }
+            }).toArray(CompletableFuture[]::new));
         }
+        return CompletableFuture.completedFuture(null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java
index 105383d..00fc791 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -41,9 +42,9 @@ public class BatchDAO implements IBatchDAO {
     }
 
     @Override
-    public void flush(List<PrepareRequest> prepareRequests) {
+    public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         if (log.isDebugEnabled()) {
@@ -51,10 +52,9 @@ public class BatchDAO implements IBatchDAO {
         }
 
         final BatchPoints.Builder builder = BatchPoints.builder();
-        prepareRequests.forEach(e -> {
-            builder.point(((InfluxInsertRequest) e).getPoint());
-        });
+        prepareRequests.forEach(e -> builder.point(((InfluxInsertRequest) e).getPoint()));
 
         client.write(builder.build());
+        return CompletableFuture.completedFuture(null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
index 6188cf7..3dcd046 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
@@ -54,9 +55,9 @@ public class H2BatchDAO implements IBatchDAO {
     }
 
     @Override
-    public void flush(List<PrepareRequest> prepareRequests) {
+    public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         if (log.isDebugEnabled()) {
             log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
@@ -80,6 +81,7 @@ public class H2BatchDAO implements IBatchDAO {
         if (log.isDebugEnabled()) {
             log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override