You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/24 07:38:11 UTC

[skywalking] 01/01: Enhance cache mechanism in the metric persistent process

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch metric-cache
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit dcd30ae61745299a89480a1a1edadd5efba3506f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Nov 24 15:37:59 2022 +0800

    Enhance cache mechanism in the metric persistent process
---
 docs/en/changes/changes.md                         |  6 ++
 .../analysis/worker/MetricsPersistentWorker.java   | 66 +++++++++++++++-------
 .../oap/server/core/storage/IMetricsDAO.java       |  5 +-
 .../server/core/storage/SessionCacheCallback.java  | 50 ++++++++++++++++
 .../client/elasticsearch/IndexRequestWrapper.java  | 13 ++++-
 .../client/elasticsearch/UpdateRequestWrapper.java | 14 ++++-
 .../library/client/request/InsertRequest.java      |  1 +
 .../library/client/request/UpdateRequest.java      |  1 +
 .../storage/plugin/banyandb/BanyanDBBatchDAO.java  | 23 ++++++--
 .../measure/BanyanDBMeasureInsertRequest.java      |  7 +++
 .../measure/BanyanDBMeasureUpdateRequest.java      |  7 +++
 .../banyandb/measure/BanyanDBMetricsDAO.java       |  9 +--
 .../stream/BanyanDBStreamInsertRequest.java        |  5 ++
 .../elasticsearch/base/BatchProcessEsDAO.java      | 16 +++++-
 .../base/MetricIndexRequestWrapper.java}           | 35 ++++++------
 .../base/MetricIndexUpdateWrapper.java}            | 35 ++++++------
 .../plugin/elasticsearch/base/MetricsEsDAO.java    |  9 +--
 .../storage/plugin/jdbc/BatchSQLExecutor.java      | 45 ++++++++++++---
 .../server/storage/plugin/jdbc/SQLExecutor.java    | 36 +++++++-----
 .../plugin/jdbc/common/dao/JDBCManagementDAO.java  |  2 +-
 .../plugin/jdbc/common/dao/JDBCMetricsDAO.java     |  9 +--
 .../plugin/jdbc/common/dao/JDBCNoneStreamDAO.java  |  2 +-
 .../plugin/jdbc/common/dao/JDBCRecordDAO.java      |  2 +-
 .../plugin/jdbc/common/dao/JDBCSQLExecutor.java    | 28 +++++----
 .../common/dao/JDBCUITemplateManagementDAO.java    |  4 +-
 .../shardingsphere/ShardingIntegrationTest.java    |  2 +-
 26 files changed, 318 insertions(+), 114 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index e865f06135..e4f4e855c9 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -119,6 +119,12 @@
 * Support dynamic config the sampling strategy in network profiling.
 * Zipkin module support BanyanDB storage.
 * Zipkin traces query API, sort the result set by start time by default.
+* Enhance cache mechanism in the metric persistent process.
+  * This cache only worked when the metric is accessible(readable) from the database. Once the insert execution delayed
+    due to the scale, the cache would lose efficacy. It only works for the last time update per minute, considering our
+    25s period.
+  * Fix ID conflicts for all JDBC storage implementation. Due to insert delay, the JDBC storage implementation would
+    still generate another new insert statement.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 4725f749e2..f4b755e8ed 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -20,27 +20,29 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
 import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
@@ -58,7 +60,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
 
     private final Model model;
-    private final Map<Metrics, Metrics> context;
+    /**
+     * The session cache holds the latest metrics in-memory.
+     * There are two ways to make sure metrics in-cache,
+     * 1. Metrics is read from the Database through {@link #loadFromStorage(List)}
+     * 2. The built {@link InsertRequest} executed successfully.
+     */
+    private final Map<Metrics, Metrics> sessionCache;
     private final IMetricsDAO metricsDAO;
     private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
     private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
@@ -89,7 +97,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
                             long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
         super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
         this.model = model;
-        this.context = new HashMap<>(100);
+        // Due to the cache would be updated depending on final storage implementation,
+        // the map/cache could be updated concurrently.
+        // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
+        // Since 9.4.0
+        this.sessionCache = new ConcurrentHashMap<>(100);
         this.enableDatabaseSession = enableDatabaseSession;
         this.metricsDAO = metricsDAO;
         this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
@@ -192,12 +204,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
             metricsList.add(data);
 
             if (metricsList.size() == batchSize) {
-                flushDataToStorage(metricsList, prepareRequests);
+                prepareFlushDataToStorage(metricsList, prepareRequests);
             }
         }
 
         if (metricsList.size() > 0) {
-            flushDataToStorage(metricsList, prepareRequests);
+            prepareFlushDataToStorage(metricsList, prepareRequests);
         }
 
         if (prepareRequests.size() > 0) {
@@ -209,14 +221,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         return prepareRequests;
     }
 
-    private void flushDataToStorage(List<Metrics> metricsList,
-                                    List<PrepareRequest> prepareRequests) {
+    /**
+     * Build given prepareRequests to prepare database flush
+     *
+     * @param metricsList     the metrics in the last read from the in-memory aggregated cache.
+     * @param prepareRequests the results for final execution.
+     */
+    private void prepareFlushDataToStorage(List<Metrics> metricsList,
+                                           List<PrepareRequest> prepareRequests) {
         try {
             loadFromStorage(metricsList);
 
             long timestamp = System.currentTimeMillis();
             for (Metrics metrics : metricsList) {
-                Metrics cachedMetrics = context.get(metrics);
+                Metrics cachedMetrics = sessionCache.get(metrics);
                 if (cachedMetrics != null) {
                     /*
                      * If the metrics is not supportUpdate, defined through MetricsExtension#supportUpdate,
@@ -233,12 +251,22 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
                         continue;
                     }
                     cachedMetrics.calculate();
-                    prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics));
+                    prepareRequests.add(
+                        metricsDAO.prepareBatchUpdate(
+                            model,
+                            cachedMetrics,
+                            new SessionCacheCallback(sessionCache, cachedMetrics)
+                        ));
                     nextWorker(cachedMetrics);
                     cachedMetrics.setLastUpdateTimestamp(timestamp);
                 } else {
                     metrics.calculate();
-                    prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics));
+                    prepareRequests.add(
+                        metricsDAO.prepareBatchInsert(
+                            model,
+                            metrics,
+                            new SessionCacheCallback(sessionCache, metrics)
+                        ));
                     nextWorker(metrics);
                     metrics.setLastUpdateTimestamp(timestamp);
                 }
@@ -271,7 +299,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
             List<Metrics> notInCacheMetrics =
                 metrics.stream()
                        .filter(m -> {
-                           final Metrics cachedValue = context.get(m);
+                           final Metrics cachedValue = sessionCache.get(m);
                            // Not cached or session disabled, the metric could be tagged `not in cache`.
                            if (cachedValue == null || !enableDatabaseSession) {
                                return true;
@@ -286,7 +314,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
 
                                if (metricsDAO.isExpiredCache(model, cachedValue, currentTimeMillis, metricsDataTTL)) {
                                    // The expired metrics should be removed from the context and tagged `not in cache` directly.
-                                   context.remove(m);
+                                   sessionCache.remove(m);
                                    return true;
                                }
                            }
@@ -301,9 +329,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
             final List<Metrics> dbMetrics = metricsDAO.multiGet(model, notInCacheMetrics);
             if (!enableDatabaseSession) {
                 // Clear the cache only after results from DB are returned successfully.
-                context.clear();
+                sessionCache.clear();
             }
-            dbMetrics.forEach(m -> context.put(m, m));
+            dbMetrics.forEach(m -> sessionCache.put(m, m));
         } catch (final Exception e) {
             log.error("Failed to load metrics for merging", e);
         }
@@ -312,7 +340,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     @Override
     public void endOfRound() {
         if (enableDatabaseSession) {
-            Iterator<Metrics> iterator = context.values().iterator();
+            Iterator<Metrics> iterator = sessionCache.values().iterator();
             long timestamp = System.currentTimeMillis();
             while (iterator.hasNext()) {
                 Metrics metrics = iterator.next();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 4320a83cd9..1baee65e4c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -47,7 +47,7 @@ public interface IMetricsDAO extends DAO {
      * @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
      * executed ASAP.
      */
-    InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
+    InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
 
     /**
      * Transfer the given metrics to an executable update statement.
@@ -55,7 +55,7 @@ public interface IMetricsDAO extends DAO {
      * @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
      * executed ASAP.
      */
-    UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
+    UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
 
     /**
      * Calculate the expired status of the metric by given current timestamp, metric and TTL.
@@ -72,4 +72,5 @@ public interface IMetricsDAO extends DAO {
         // If the cached metric is older than the TTL indicated.
         return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
     }
+
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
new file mode 100644
index 0000000000..923da32f72
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+
+/**
+ * SessionCacheCallback provides a bridge for storage implementations
+ */
+@RequiredArgsConstructor
+public class SessionCacheCallback {
+    private final Map<Metrics, Metrics> sessionCache;
+    private final Metrics metrics;
+    /**
+     * In some cases, this callback could be shared by multiple executions, such as SQLExecutor#additionalSQLs.
+     * This flag would make sure, once one of the generated executions is failure, the whole metric would be removed
+     * from the cache, and would not be added back. As those are executed in a batch mode. The sequence is uncertain.
+     */
+    private volatile boolean isFailed = false;
+
+    public void onInsertCompleted() {
+        if (isFailed) {
+            return;
+        }
+        sessionCache.put(metrics, metrics);
+    }
+
+    public void onUpdateFailure() {
+        sessionCache.remove(metrics);
+        isFailed = true;
+    }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
index 3e2cbf7ad0..e6bd63e28f 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
 @Getter
 public class IndexRequestWrapper implements InsertRequest {
-    private final IndexRequest request;
+    protected IndexRequest request;
 
     public IndexRequestWrapper(String index, String type, String id,
                                Map<String, ?> source) {
@@ -35,4 +35,15 @@ public class IndexRequestWrapper implements InsertRequest {
                               .doc(source)
                               .build();
     }
+
+    /**
+     * Expose an empty constructor to lazy initialization.
+     */
+    protected IndexRequestWrapper() {
+
+    }
+
+    @Override
+    public void onInsertCompleted() {
+    }
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
index 3241aa84e4..dbd9c8c1d2 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
@@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 
 @Getter
 public class UpdateRequestWrapper implements UpdateRequest {
-    private final org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
+    protected org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
 
     public UpdateRequestWrapper(String index, String type, String id,
                                 Map<String, Object> source) {
@@ -34,4 +34,16 @@ public class UpdateRequestWrapper implements UpdateRequest {
                                                                                     .doc(source)
                                                                                     .build();
     }
+
+    /**
+     * Expose an empty constructor to lazy initialization.
+     */
+    protected UpdateRequestWrapper() {
+
+    }
+
+    @Override
+    public void onUpdateFailure() {
+
+    }
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
index a9dbdab1af..91ded6a487 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
@@ -18,4 +18,5 @@
 package org.apache.skywalking.oap.server.library.client.request;
 
 public interface InsertRequest extends PrepareRequest {
+    void onInsertCompleted();
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
index ea914913c9..2895216971 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
@@ -18,4 +18,5 @@
 package org.apache.skywalking.oap.server.library.client.request;
 
 public interface UpdateRequest extends PrepareRequest {
+    void onUpdateFailure();
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index 9564de27da..b3cd0461e0 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -18,6 +18,9 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
@@ -29,10 +32,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
 
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-
 public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
     private static final Object STREAM_SYNCHRONIZER = new Object();
     private static final Object MEASURE_SYNCHRONIZER = new Object();
@@ -69,9 +68,21 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
                 if (r instanceof BanyanDBStreamInsertRequest) {
                     return getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
                 } else if (r instanceof BanyanDBMeasureInsertRequest) {
-                    return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
+                    return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite())
+                                                         .whenComplete((v, throwable) -> {
+                                                             if (throwable == null) {
+                                                                 // Insert completed
+                                                                 ((BanyanDBMeasureInsertRequest) r).onInsertCompleted();
+                                                             }
+                                                         });
                 } else if (r instanceof BanyanDBMeasureUpdateRequest) {
-                    return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
+                    return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite())
+                                                         .whenComplete((v, throwable) -> {
+                                                             if (throwable != null) {
+                                                                 // Update failure
+                                                                 ((BanyanDBMeasureUpdateRequest) r).onUpdateFailure();
+                                                             }
+                                                         });
                 }
                 return CompletableFuture.completedFuture(null);
             }).toArray(CompletableFuture[]::new));
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
index 3f965bff6f..bbfe883372 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
@@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
 @RequiredArgsConstructor
 @Getter
 public class BanyanDBMeasureInsertRequest implements InsertRequest {
     private final MeasureWrite measureWrite;
+    private final SessionCacheCallback callback;
+
+    @Override
+    public void onInsertCompleted() {
+        callback.onInsertCompleted();
+    }
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
index 2ff1fae015..b39190aada 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
@@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 
 @RequiredArgsConstructor
 @Getter
 public class BanyanDBMeasureUpdateRequest implements UpdateRequest {
     private final MeasureWrite measureWrite;
+    private final SessionCacheCallback callback;
+
+    @Override
+    public void onUpdateFailure() {
+        callback.onUpdateFailure();
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 0b24eb0e29..0ca07e8cfe 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -77,7 +78,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
     }
 
     @Override
-    public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+    public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
         log.info("prepare to insert {}", model.getName());
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
         if (schema == null) {
@@ -89,11 +90,11 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
         final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
         storageBuilder.entity2Storage(metrics, toStorage);
         toStorage.acceptID(metrics.id());
-        return new BanyanDBMeasureInsertRequest(toStorage.obtain());
+        return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback);
     }
 
     @Override
-    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
         log.info("prepare to update {}", model.getName());
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
         if (schema == null) {
@@ -105,6 +106,6 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
         final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
         storageBuilder.entity2Storage(metrics, toStorage);
         toStorage.acceptID(metrics.id());
-        return new BanyanDBMeasureUpdateRequest(toStorage.obtain());
+        return new BanyanDBMeasureUpdateRequest(toStorage.obtain(), callback);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
index b04e4422d5..15dbc52c53 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
@@ -27,4 +27,9 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 @Getter
 public class BanyanDBStreamInsertRequest implements InsertRequest {
     private final StreamWrite streamWrite;
+
+    @Override
+    public void onInsertCompleted() {
+
+    }
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index dc6c47bb19..76e44d67fd 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -75,9 +75,21 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
             return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
                 if (prepareRequest instanceof InsertRequest) {
-                    return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest())
+                        .whenComplete((v, throwable) -> {
+                            if (throwable == null) {
+                                // Insert completed
+                                ((IndexRequestWrapper) prepareRequest).onInsertCompleted();
+                            }
+                        });
                 } else {
-                    return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest())
+                        .whenComplete((v, throwable) -> {
+                            if (throwable != null) {
+                                // Update failure
+                                ((UpdateRequestWrapper) prepareRequest).onUpdateFailure();
+                            }
+                        });
                 }
             }).toArray(CompletableFuture[]::new));
         }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
similarity index 50%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
index 3e2cbf7ad0..ce5cc6ec91 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
@@ -13,26 +13,29 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexRequestWrapper;
 
-import java.util.Map;
-import lombok.Getter;
-import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+/**
+ * MetricIndexRequestWrapper wraps the built request wrapper with a new callback.
+ */
+public class MetricIndexRequestWrapper extends IndexRequestWrapper {
+    private final SessionCacheCallback callback;
 
-@Getter
-public class IndexRequestWrapper implements InsertRequest {
-    private final IndexRequest request;
+    public MetricIndexRequestWrapper(IndexRequestWrapper requestWrapper, SessionCacheCallback callback) {
+        this.request = requestWrapper.getRequest();
+        this.callback = callback;
+    }
 
-    public IndexRequestWrapper(String index, String type, String id,
-                               Map<String, ?> source) {
-        request = IndexRequest.builder()
-                              .index(index)
-                              .type(type)
-                              .id(id)
-                              .doc(source)
-                              .build();
+    @Override
+    public void onInsertCompleted() {
+        if (callback != null) {
+            callback.onInsertCompleted();
+        }
     }
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
similarity index 50%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
index 3e2cbf7ad0..34216534b9 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
@@ -13,26 +13,29 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.UpdateRequestWrapper;
 
-import java.util.Map;
-import lombok.Getter;
-import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+/**
+ * MetricIndexUpdateWrapper wraps the built request wrapper with a new callback.
+ */
+public class MetricIndexUpdateWrapper extends UpdateRequestWrapper {
+    private final SessionCacheCallback callback;
 
-@Getter
-public class IndexRequestWrapper implements InsertRequest {
-    private final IndexRequest request;
+    public MetricIndexUpdateWrapper(UpdateRequestWrapper requestWrapper, SessionCacheCallback callback) {
+        this.request = requestWrapper.getRequest();
+        this.callback = callback;
+    }
 
-    public IndexRequestWrapper(String index, String type, String id,
-                               Map<String, ?> source) {
-        request = IndexRequest.builder()
-                              .index(index)
-                              .type(type)
-                              .id(id)
-                              .doc(source)
-                              .build();
+    @Override
+    public void onUpdateFailure() {
+        if (callback != null) {
+            callback.onUpdateFailure();
+        }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 0363669475..12319dae1a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -32,6 +32,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -97,24 +98,24 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
     }
 
     @Override
-    public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
+    public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) {
         final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
         String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
         String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
-        return getClient().prepareInsert(modelName, id, builder);
+        return new MetricIndexRequestWrapper(getClient().prepareInsert(modelName, id, builder), callback);
     }
 
     @Override
-    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
+    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) {
         final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> builder =
             IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
         String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
         String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
-        return getClient().prepareUpdate(modelName, id, builder);
+        return new MetricIndexUpdateWrapper(getClient().prepareUpdate(modelName, id, builder), callback);
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
index a3d4848463..4061ae7405 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
@@ -18,15 +18,17 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.jdbc;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.List;
 
 /**
  * A Batch SQL executor.
@@ -45,32 +47,59 @@ public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
             return;
         }
         String sql = prepareRequests.get(0).toString();
+        List<PrepareRequest> bulkRequest = new ArrayList<>(maxBatchSqlSize);
         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
             int pendingCount = 0;
             for (int k = 0; k < prepareRequests.size(); k++) {
                 SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
                 sqlExecutor.setParameters(preparedStatement);
                 preparedStatement.addBatch();
+                bulkRequest.add(sqlExecutor);
                 if (k > 0 && k % maxBatchSqlSize == 0) {
-                    executeBatch(preparedStatement, maxBatchSqlSize, sql);
+                    executeBatch(preparedStatement, maxBatchSqlSize, sql, bulkRequest);
+                    bulkRequest.clear();
                     pendingCount = 0;
                 } else {
                     pendingCount++;
                 }
             }
             if (pendingCount > 0) {
-                executeBatch(preparedStatement, pendingCount, sql);
+                executeBatch(preparedStatement, pendingCount, sql, bulkRequest);
+                bulkRequest.clear();
             }
         }
     }
 
-    private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
+    private void executeBatch(PreparedStatement preparedStatement,
+                              int pendingCount,
+                              String sql,
+                              List<PrepareRequest> bulkRequest) throws SQLException {
         long start = System.currentTimeMillis();
-        preparedStatement.executeBatch();
+        final int[] executeBatchResults = preparedStatement.executeBatch();
+        boolean isInsert = bulkRequest.get(0) instanceof InsertRequest;
+        for (int i = 0; i < executeBatchResults.length; i++) {
+            if (executeBatchResults[i] == 1 && isInsert) {
+                // Insert successfully.
+                ((InsertRequest) bulkRequest.get(i)).onInsertCompleted();
+            } else if (executeBatchResults[i] == 0 && !isInsert) {
+                // Update Failure.
+                ((UpdateRequest) bulkRequest.get(i)).onUpdateFailure();
+            }
+        }
         if (log.isDebugEnabled()) {
             long end = System.currentTimeMillis();
             long cost = end - start;
             log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
         }
     }
+
+    @Override
+    public void onInsertCompleted() {
+        throw new UnexpectedException("BatchSQLExecutor.onInsertCompleted should not be called");
+    }
+
+    @Override
+    public void onUpdateFailure() {
+        throw new UnexpectedException("BatchSQLExecutor.onUpdateFailure should not be called");
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
index 26c19ee276..266350eb45 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
@@ -25,34 +25,30 @@ import java.util.ArrayList;
 import java.util.List;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A SQL executor.
  */
 @EqualsAndHashCode(of = "sql")
+@RequiredArgsConstructor
+@Slf4j
 public class SQLExecutor implements InsertRequest, UpdateRequest {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class);
-
-    private String sql;
-    private List<Object> param;
+    private final String sql;
+    private final List<Object> param;
+    private final SessionCacheCallback callback;
     @Getter
     private List<SQLExecutor> additionalSQLs;
 
-    public SQLExecutor(String sql, List<Object> param) {
-        this.sql = sql;
-        this.param = param;
-    }
-
     public void invoke(Connection connection) throws SQLException {
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         setParameters(preparedStatement);
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql in batch: {}, parameters: {}", sql, param);
         }
         preparedStatement.execute();
         if (additionalSQLs != null) {
@@ -79,4 +75,16 @@ public class SQLExecutor implements InsertRequest, UpdateRequest {
         }
         additionalSQLs.addAll(sqlExecutors);
     }
+
+    @Override
+    public void onInsertCompleted() {
+        if (callback != null)
+            callback.onInsertCompleted();
+    }
+
+    @Override
+    public void onUpdateFailure() {
+        if (callback != null)
+            callback.onUpdateFailure();
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
index 50becff534..2aea8f6ef3 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
@@ -48,7 +48,7 @@ public class JDBCManagementDAO extends JDBCSQLExecutor implements IManagementDAO
             }
 
             SQLExecutor insertExecutor = getInsertExecutor(model.getName(), storageData, storageBuilder,
-                                                           new HashMapConverter.ToStorage());
+                                                           new HashMapConverter.ToStorage(), null);
             insertExecutor.invoke(connection);
         } catch (IOException | SQLException e) {
             throw new IOException(e.getMessage(), e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
index 3294d80720..5170362df7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
@@ -49,12 +50,12 @@ public class JDBCMetricsDAO extends JDBCSQLExecutor implements IMetricsDAO {
     }
 
     @Override
-    public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
-        return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage());
+    public SQLExecutor prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
+        return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage(), callback);
     }
 
     @Override
-    public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
-        return getUpdateExecutor(model.getName(), metrics, storageBuilder);
+    public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
+        return getUpdateExecutor(model.getName(), metrics, storageBuilder, callback);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
index aaf903b76b..7a87059e08 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
@@ -41,7 +41,7 @@ public class JDBCNoneStreamDAO extends JDBCSQLExecutor implements INoneStreamDAO
     @Override
     public void insert(Model model, NoneStream noneStream) throws IOException {
         try (Connection connection = jdbcClient.getConnection()) {
-            SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage());
+            SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage(), null);
             insertExecutor.invoke(connection);
         } catch (IOException | SQLException e) {
             throw new IOException(e.getMessage(), e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
index 1ca36a69fe..f382f746e1 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
@@ -35,6 +35,6 @@ public class JDBCRecordDAO extends JDBCSQLExecutor implements IRecordDAO {
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-        return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage());
+        return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage(), null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
index 12321e3629..c15f4768ae 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
@@ -116,7 +117,8 @@ public class JDBCSQLExecutor {
 
     protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
                                                                     StorageBuilder<T> storageBuilder,
-                                                                    Convert2Storage<Map<String, Object>> converter) throws IOException {
+                                                                    Convert2Storage<Map<String, Object>> converter,
+                                                                    SessionCacheCallback callback) throws IOException {
         Model model = TableMetaInfo.get(modelName);
         storageBuilder.entity2Storage(metrics, converter);
         Map<String, Object> objectMap = converter.obtain();
@@ -126,7 +128,7 @@ public class JDBCSQLExecutor {
             mainEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName()));
         });
         SQLExecutor sqlExecutor = buildInsertExecutor(
-            modelName, model.getColumns(), metrics, mainEntity);
+            modelName, model.getColumns(), metrics, mainEntity, callback);
         //build additional table sql
         for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
                                                                               .getAdditionalTables()
@@ -137,7 +139,7 @@ public class JDBCSQLExecutor {
             });
 
             List<SQLExecutor> additionalSQLExecutors = buildAdditionalInsertExecutor(
-                additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity
+                additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity, callback
             );
             sqlExecutor.appendAdditionalSQLs(additionalSQLExecutors);
         }
@@ -147,7 +149,8 @@ public class JDBCSQLExecutor {
     private <T extends StorageData> SQLExecutor buildInsertExecutor(String tableName,
                                                                     List<ModelColumn> columns,
                                                                     T metrics,
-                                                                    Map<String, Object> objectMap) throws IOException {
+                                                                    Map<String, Object> objectMap,
+                                                                    SessionCacheCallback onCompleteCallback) throws IOException {
         SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
         List<Object> param = new ArrayList<>();
         sqlBuilder.append("(?,");
@@ -169,13 +172,14 @@ public class JDBCSQLExecutor {
         }
         sqlBuilder.append(")");
 
-        return new SQLExecutor(sqlBuilder.toString(), param);
+        return new SQLExecutor(sqlBuilder.toString(), param, onCompleteCallback);
     }
 
     private <T extends StorageData> List<SQLExecutor> buildAdditionalInsertExecutor(String tableName,
                                                                                     List<ModelColumn> columns,
                                                                                     T metrics,
-                                                                                    Map<String, Object> objectMap) throws IOException {
+                                                                                    Map<String, Object> objectMap,
+                                                                                    SessionCacheCallback callback) throws IOException {
 
         List<SQLExecutor> sqlExecutors = new ArrayList<>();
         SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
@@ -211,17 +215,18 @@ public class JDBCSQLExecutor {
             for (Object object : valueList) {
                 List<Object> paramCopy = new ArrayList<>(param);
                 paramCopy.set(position, object);
-                sqlExecutors.add(new SQLExecutor(sql, paramCopy));
+                sqlExecutors.add(new SQLExecutor(sql, paramCopy, callback));
             }
         } else {
-            sqlExecutors.add(new SQLExecutor(sql, param));
+            sqlExecutors.add(new SQLExecutor(sql, param, callback));
         }
 
         return sqlExecutors;
     }
 
     protected <T extends StorageData> SQLExecutor getUpdateExecutor(String modelName, T metrics,
-                                                                    StorageBuilder<T> storageBuilder) throws IOException {
+                                                                    StorageBuilder<T> storageBuilder,
+                                                                    SessionCacheCallback callback) throws IOException {
         final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> objectMap = toStorage.obtain();
@@ -236,7 +241,8 @@ public class JDBCSQLExecutor {
             if (model.getSqlDBModelExtension().isShardingTable()) {
                 SQLDatabaseModelExtension.Sharding sharding = model.getSqlDBModelExtension().getSharding().orElseThrow(
                     () -> new UnexpectedException("Sharding should not be empty."));
-                if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(sharding.getTableShardingColumn())) {
+                if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(
+                    sharding.getTableShardingColumn())) {
                     continue;
                 }
             }
@@ -253,6 +259,6 @@ public class JDBCSQLExecutor {
         sqlBuilder.append(" WHERE id = ?");
         param.add(metrics.id());
 
-        return new SQLExecutor(sqlBuilder.toString(), param);
+        return new SQLExecutor(sqlBuilder.toString(), param, callback);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
index 7097aad725..9267efd21c 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
@@ -102,7 +102,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
     public TemplateChangeStatus addTemplate(final DashboardSetting setting) throws IOException {
         final UITemplate uiTemplate = setting.toEntity();
         final SQLExecutor insertExecutor = getInsertExecutor(
-            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage());
+            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage(), null);
         try (Connection connection = h2Client.getConnection()) {
             insertExecutor.invoke(connection);
             return TemplateChangeStatus.builder().status(true).id(setting.getId()).build();
@@ -135,7 +135,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
 
     private TemplateChangeStatus executeUpdate(final UITemplate uiTemplate) throws IOException {
         final SQLExecutor updateExecutor = getUpdateExecutor(
-            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder());
+            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), null);
         try (Connection connection = h2Client.getConnection()) {
             updateExecutor.invoke(connection);
             return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build();
diff --git a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
index c7ac504256..c9776758ec 100644
--- a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
+++ b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
@@ -642,7 +642,7 @@ public class ShardingIntegrationTest {
                                                                                     .builder()
                                                                                     .getDeclaredConstructor()
                                                                                     .newInstance());
-                jdbcMetricsDAO.prepareBatchInsert(model, metrics).invoke(conn);
+                jdbcMetricsDAO.prepareBatchInsert(model, metrics, null).invoke(conn);
             }
         }
     }