You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/11/20 07:54:38 UTC

[skywalking] 01/01: Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch bugfix/so11y
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit bbd9b57891817d5954031abcf4cb897c6f828fc1
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sat Nov 20 15:51:56 2021 +0800

    Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage
    
    The ES persistence execution is now asynchronous and the execution
    latency only counts the time to insert the requests into the bulk
    processor, instead of the time after the requests are flushed into the
    storage, this patch fixes that issue.
    
    There is the same issue in prepare latency but that needs more changes
    so I'll leave it to another pull request.
---
 CHANGES.md                                         |   1 +
 .../analysis/worker/MetricsPersistentWorker.java   |   4 +-
 .../oap/server/core/storage/IBatchDAO.java         |   3 +-
 .../oap/server/core/storage/PersistenceTimer.java  | 107 +++++++++++----------
 .../server/core/storage/PersistenceTimerTest.java  |   4 +-
 .../library/elasticsearch/bulk/BulkProcessor.java  |  36 ++++---
 .../elasticsearch/base/BatchProcessEsDAO.java      |  12 ++-
 .../storage/plugin/influxdb/base/BatchDAO.java     |  10 +-
 .../storage/plugin/jdbc/h2/dao/H2BatchDAO.java     |   6 +-
 9 files changed, 103 insertions(+), 80 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 86f2242..25f1ba1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -54,6 +54,7 @@ Release Notes.
 * Add filter mechanism in MAL core to filter metrics.
 * Fix concurrency bug in MAL `increase`-related calculation.
 * Fix a null pointer bug when building `SampleFamily`.
+* Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 07d14ef..9758cb3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -179,14 +179,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     @Override
     public List<PrepareRequest> buildBatchRequests() {
         if (persistentCounter++ % persistentMod != 0) {
-            return Collections.EMPTY_LIST;
+            return Collections.emptyList();
         }
 
         final List<Metrics> lastCollection = getCache().read();
 
         long start = System.currentTimeMillis();
         if (lastCollection.size() == 0) {
-            return Collections.EMPTY_LIST;
+            return Collections.emptyList();
         }
 
         /*
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
index ed3172f..f7279f27 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 
@@ -43,5 +44,5 @@ public interface IBatchDAO extends DAO {
      *
      * @param prepareRequests data to insert or update. No delete happens in streaming mode.
      */
-    void flush(List<PrepareRequest> prepareRequests);
+    CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 594cac1..c9d06d1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -21,12 +21,11 @@ package org.apache.skywalking.oap.server.core.storage;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker;
@@ -34,6 +33,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
@@ -56,21 +56,25 @@ public enum PersistenceTimer {
 
     public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
         log.info("persistence timer start");
-        IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
+        IBatchDAO batchDAO =
+            moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
 
         MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
                                                      .provider()
                                                      .getService(MetricsCreator.class);
         errorCounter = metricsCreator.createCounter(
-            "persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer",
+            "persistence_timer_bulk_error_count",
+            "Error execution of the prepare stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
         prepareLatency = metricsCreator.createHistogramMetric(
-            "persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer",
+            "persistence_timer_bulk_prepare_latency",
+            "Latency of the prepare stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
         executeLatency = metricsCreator.createHistogramMetric(
-            "persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
+            "persistence_timer_bulk_execute_latency",
+            "Latency of the execute stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
         allLatency = metricsCreator.createHistogramMetric(
@@ -82,8 +86,11 @@ public enum PersistenceTimer {
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor()
                      .scheduleWithFixedDelay(
-                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
-                             .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
+                         new RunnableWithExceptionProtection(
+                             () -> extractDataAndSave(batchDAO), t -> log
+                             .error(
+                                 "Extract data and save failure.", t)), 5,
+                         moduleConfig.getPersistentPeriod(),
                          TimeUnit.SECONDS
                      );
 
@@ -98,53 +105,51 @@ public enum PersistenceTimer {
 
         long startTime = System.currentTimeMillis();
 
-        try (HistogramMetrics.Timer allTimer = allLatency.createTimer()) {
-            List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
-            persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
-            persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
-
-            CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size());
-            persistenceWorkers.forEach(worker -> {
-                prepareExecutorService.submit(() -> {
-                    List<PrepareRequest> innerPrepareRequests = null;
-                    try {
-                        // Prepare stage
-                        try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("extract {} worker data and save", worker.getClass().getName());
-                            }
-
-                            innerPrepareRequests = worker.buildBatchRequests();
-
-                            worker.endOfRound();
-                        } catch (Throwable e) {
-                            log.error(e.getMessage(), e);
+        HistogramMetrics.Timer allTimer = allLatency.createTimer();
+        List<PersistenceWorker<? extends StorageData>> workers = new ArrayList<>();
+        workers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+        workers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+
+        final CompletableFuture<Void> future =
+            CompletableFuture.allOf(workers.stream().map(worker -> {
+                return CompletableFuture.runAsync(() -> {
+                    List<PrepareRequest> innerPrepareRequests;
+                    // Prepare stage
+                    try (HistogramMetrics.Timer ignored = prepareLatency.createTimer()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(
+                                "extract {} worker data and save",
+                                worker.getClass().getName()
+                            );
                         }
 
-                        // Execution stage
-                        try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) {
-                            if (CollectionUtils.isNotEmpty(innerPrepareRequests)) {
-                                batchDAO.flush(innerPrepareRequests);
-                            }
-                        } catch (Throwable e) {
-                            log.error(e.getMessage(), e);
-                        }
-                    } finally {
-                        countDownLatch.countDown();
+                        innerPrepareRequests = worker.buildBatchRequests();
+
+                        worker.endOfRound();
+                    }
+
+                    if (CollectionUtils.isEmpty(innerPrepareRequests)) {
+                        return;
                     }
-                });
-            });
-
-            countDownLatch.await();
-        } catch (Throwable e) {
-            errorCounter.inc();
-            log.error(e.getMessage(), e);
-        } finally {
+
+                    // Execution stage
+                    HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
+                    batchDAO.flush(innerPrepareRequests)
+                            .whenComplete(($1, $2) -> executeLatencyTimer.close());
+                }, prepareExecutorService);
+            }).toArray(CompletableFuture[]::new));
+        future.whenComplete((unused, throwable) -> {
+            allTimer.close();
             if (log.isDebugEnabled()) {
-                log.debug("Persistence data save finish");
+                log.debug(
+                    "Batch persistence duration: {} ms",
+                    System.currentTimeMillis() - startTime
+                );
             }
-        }
-
-        log.debug("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+            if (throwable != null) {
+                errorCounter.inc();
+                log.error(throwable.getMessage(), throwable);
+            }
+        });
     }
 }
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
index bbe9ca6..680084b 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import lombok.Data;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker;
@@ -61,10 +62,11 @@ public class PersistenceTimerTest {
             }
 
             @Override
-            public void flush(final List<PrepareRequest> prepareRequests) {
+            public CompletableFuture<Void> flush(final List<PrepareRequest> prepareRequests) {
                 synchronized (result) {
                     result.addAll(prepareRequests);
                 }
+                return CompletableFuture.completedFuture(null);
             }
         };
         for (int i = 0; i < workCount; i++) {
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index 93c49ae..2733b13 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.ElasticSearch;
@@ -41,7 +42,7 @@ import static java.util.Objects.requireNonNull;
 
 @Slf4j
 public final class BulkProcessor {
-    private final ArrayBlockingQueue<Object> requests;
+    private final ArrayBlockingQueue<Holder> requests;
 
     private final AtomicReference<ElasticSearch> es;
     private final int bulkActions;
@@ -74,21 +75,21 @@ public final class BulkProcessor {
             this::flush, 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
     }
 
-    public BulkProcessor add(IndexRequest request) {
-        internalAdd(request);
-        return this;
+    public CompletableFuture<Void> add(IndexRequest request) {
+        return internalAdd(request);
     }
 
-    public BulkProcessor add(UpdateRequest request) {
-        internalAdd(request);
-        return this;
+    public CompletableFuture<Void> add(UpdateRequest request) {
+        return internalAdd(request);
     }
 
     @SneakyThrows
-    private void internalAdd(Object request) {
+    private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
-        requests.put(request);
+        final CompletableFuture<Void> f = new CompletableFuture<>();
+        requests.put(new Holder(f, request));
         flushIfNeeded();
+        return f;
     }
 
     @SneakyThrows
@@ -110,7 +111,7 @@ public final class BulkProcessor {
             return;
         }
 
-        final List<Object> batch = new ArrayList<>(requests.size());
+        final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
 
         final CompletableFuture<Void> flush = doFlush(batch);
@@ -118,7 +119,7 @@ public final class BulkProcessor {
         flush.join();
     }
 
-    private CompletableFuture<Void> doFlush(final List<Object> batch) {
+    private CompletableFuture<Void> doFlush(final List<Holder> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
 
         if (batch.isEmpty()) {
@@ -129,8 +130,8 @@ public final class BulkProcessor {
             try {
                 final RequestFactory rf = v.requestFactory();
                 final List<byte[]> bs = new ArrayList<>();
-                for (final Object request : batch) {
-                    bs.add(v.codec().encode(request));
+                for (final Holder holder : batch) {
+                    bs.add(v.codec().encode(holder.request));
                     bs.add("\n".getBytes());
                 }
                 final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
@@ -147,11 +148,20 @@ public final class BulkProcessor {
         });
         future.whenComplete((ignored, exception) -> {
             if (exception != null) {
+                batch.stream().map(it -> it.future)
+                     .forEach(it -> it.completeExceptionally(exception));
                 log.error("Failed to execute requests in bulk", exception);
             } else {
                 log.debug("Succeeded to execute {} requests in bulk", batch.size());
+                batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
             }
         });
         return future;
     }
+
+    @RequiredArgsConstructor
+    static class Holder {
+        private final CompletableFuture<Void> future;
+        private final Object request;
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 0cc371d..ef8ec00 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
@@ -56,19 +57,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
     }
 
     @Override
-    public void flush(List<PrepareRequest> prepareRequests) {
+    public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
         if (bulkProcessor == null) {
             this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
         }
 
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
+            return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
                 if (prepareRequest instanceof InsertRequest) {
-                    this.bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
                 } else {
-                    this.bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
                 }
-            }
+            }).toArray(CompletableFuture[]::new));
         }
+        return CompletableFuture.completedFuture(null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java
index 105383d..00fc791 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -41,9 +42,9 @@ public class BatchDAO implements IBatchDAO {
     }
 
     @Override
-    public void flush(List<PrepareRequest> prepareRequests) {
+    public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         if (log.isDebugEnabled()) {
@@ -51,10 +52,9 @@ public class BatchDAO implements IBatchDAO {
         }
 
         final BatchPoints.Builder builder = BatchPoints.builder();
-        prepareRequests.forEach(e -> {
-            builder.point(((InfluxInsertRequest) e).getPoint());
-        });
+        prepareRequests.forEach(e -> builder.point(((InfluxInsertRequest) e).getPoint()));
 
         client.write(builder.build());
+        return CompletableFuture.completedFuture(null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
index 6188cf7..3dcd046 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
@@ -54,9 +55,9 @@ public class H2BatchDAO implements IBatchDAO {
     }
 
     @Override
-    public void flush(List<PrepareRequest> prepareRequests) {
+    public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         if (log.isDebugEnabled()) {
             log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
@@ -80,6 +81,7 @@ public class H2BatchDAO implements IBatchDAO {
         if (log.isDebugEnabled()) {
             log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override