You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/24 15:56:22 UTC

[skywalking] branch master updated: Enhance cache mechanism in the metric persistent process (#10021)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa821d9f5 Enhance cache mechanism in the metric persistent process (#10021)
2fa821d9f5 is described below

commit 2fa821d9f5b6ace1d510db79d5a9e47fad7f78b3
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Nov 24 23:56:16 2022 +0800

    Enhance cache mechanism in the metric persistent process (#10021)
---
 docs/en/changes/changes.md                         |   7 ++
 docs/en/setup/backend/configuration-vocabulary.md  |   1 -
 .../oap/server/core/CoreModuleConfig.java          |   4 -
 .../oap/server/core/CoreModuleProvider.java        |   1 -
 .../analysis/worker/MetricsPersistentWorker.java   | 100 +++++++++++++--------
 .../analysis/worker/MetricsStreamProcessor.java    |  10 +--
 .../oap/server/core/storage/IMetricsDAO.java       |   5 +-
 .../server/core/storage/SessionCacheCallback.java  |  50 +++++++++++
 .../client/elasticsearch/IndexRequestWrapper.java  |  13 ++-
 .../client/elasticsearch/UpdateRequestWrapper.java |  14 ++-
 .../library/client/request/InsertRequest.java      |   1 +
 .../library/client/request/UpdateRequest.java      |   1 +
 .../src/main/resources/application.yml             |   3 -
 .../storage/plugin/banyandb/BanyanDBBatchDAO.java  |  15 ++--
 .../measure/BanyanDBMeasureInsertRequest.java      |   7 ++
 .../measure/BanyanDBMeasureUpdateRequest.java      |   8 ++
 .../banyandb/measure/BanyanDBMetricsDAO.java       |   7 +-
 .../stream/BanyanDBStreamInsertRequest.java        |   5 ++
 .../elasticsearch/base/BatchProcessEsDAO.java      |  16 +++-
 .../base/MetricIndexRequestWrapper.java}           |  35 ++++----
 .../base/MetricIndexUpdateWrapper.java}            |  35 ++++----
 .../plugin/elasticsearch/base/MetricsEsDAO.java    |   9 +-
 .../storage/plugin/jdbc/BatchSQLExecutor.java      |  45 ++++++++--
 .../server/storage/plugin/jdbc/SQLExecutor.java    |  36 +++++---
 .../plugin/jdbc/common/dao/JDBCManagementDAO.java  |   2 +-
 .../plugin/jdbc/common/dao/JDBCMetricsDAO.java     |   9 +-
 .../plugin/jdbc/common/dao/JDBCNoneStreamDAO.java  |   2 +-
 .../plugin/jdbc/common/dao/JDBCRecordDAO.java      |   2 +-
 .../plugin/jdbc/common/dao/JDBCSQLExecutor.java    |  28 +++---
 .../common/dao/JDBCUITemplateManagementDAO.java    |   4 +-
 .../shardingsphere/ShardingIntegrationTest.java    |   2 +-
 31 files changed, 329 insertions(+), 148 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index d2fdc91c45..cff0a10d93 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -119,6 +119,13 @@
 * Support dynamic config the sampling strategy in network profiling.
 * Zipkin module support BanyanDB storage.
 * Zipkin traces query API, sort the result set by start time by default.
+* Enhance the cache mechanism in the metric persistent process.
+  * This cache only worked when the metric is accessible(readable) from the database. Once the insert execution is delayed
+    due to the scale, the cache loses efficacy. It only works for the last time update per minute, considering our
+    25s period.
+  * Fix ID conflicts for all JDBC storage implementations. Due to the insert delay, the JDBC storage implementation would
+    still generate another new insert statement.
+* [**Breaking Change**] Remove `core/default/enableDatabaseSession` config. 
 * [**Breaking Change**] Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB, 
   since BanyanDB stream requires a timestamp in milliseconds.
   For SQL-Database: add new column `timestamp` for tables `profile_task_log/top_n_database_statement`,
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 26c5524106..acfefe90a9 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -26,7 +26,6 @@ The Configuration Vocabulary lists all available configurations provided by `app
 | -                       | -             | l1FlushPeriod                                                                                                                                                            | The period of L1 aggregation flush to L2 aggregation (in milliseconds).                                                                                                                                                                                                               [...]
 | -                       | -             | storageSessionTimeout                                                                                                                                                    | The threshold of session time (in milliseconds). Default value is 70000.                                                                                                                                                                                                              [...]
 | -                       | -             | persistentPeriod                                                                                                                                                         | The period of doing data persistence. Unit is second.Default value is 25s                                                                                                                                                                                                             [...]
-| -                       | -             | enableDatabaseSession                                                                                                                                                    | Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute.                                                                                                                                                                        [...]
 | -                       | -             | topNReportPeriod                                                                                                                                                         | The execution period (in minutes) of top N sampler, which saves sampled data into the storage.                                                                                                                                                                                        [...]
 | -                       | -             | activeExtraModelColumns                                                                                                                                                  | Appends entity names (e.g. service names) into metrics storage entities.                                                                                                                                                                                                              [...]
 | -                       | -             | serviceNameMaxLength                                                                                                                                                     | Maximum length limit of service names.                                                                                                                                                                                                                                                [...]
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 51d33db88a..c58a49d821 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -49,10 +49,6 @@ public class CoreModuleConfig extends ModuleConfig {
      * The period of L1 aggregation flush. Unit is ms.
      */
     private long l1FlushPeriod = 500;
-    /**
-     * Enable database flush session.
-     */
-    private boolean enableDatabaseSession;
     /**
      * The threshold of session time. Unit is ms. Default value is 70s.
      */
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 27a54bb4ad..2f96358ae6 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -319,7 +319,6 @@ public class CoreModuleProvider extends ModuleProvider {
         }
 
         final MetricsStreamProcessor metricsStreamProcessor = MetricsStreamProcessor.getInstance();
-        metricsStreamProcessor.setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
         metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
         metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout());
         metricsStreamProcessor.setMetricsDataTTL(moduleConfig.getMetricsDataTTL());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 4725f749e2..5cf248ace4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -20,27 +20,30 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
 import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
@@ -58,13 +61,23 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
 
     private final Model model;
-    private final Map<Metrics, Metrics> context;
+    /**
+     * The session cache holds the latest metrics in-memory.
+     * There are two ways to make sure metrics in-cache,
+     * 1. Metrics is read from the Database through {@link #loadFromStorage(List)}
+     * 2. The built {@link InsertRequest} executed successfully.
+     *
+     * There are two cases to remove metrics from the cache.
+     * 1. The metrics expired.
+     * 2. The built {@link UpdateRequest} executed failure, which could be caused
+     * (1) Database error. (2) No data updated, such as the counter of update statement is 0 in JDBC.
+     */
+    private final Map<Metrics, Metrics> sessionCache;
     private final IMetricsDAO metricsDAO;
     private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
     private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
     private final Optional<MetricsTransWorker> transWorker;
-    private final boolean enableDatabaseSession;
     private final boolean supportUpdate;
     private long sessionTimeout;
     private CounterMetrics aggregationCounter;
@@ -85,12 +98,15 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
 
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
                             AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
-                            MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate,
+                            MetricsTransWorker transWorker, boolean supportUpdate,
                             long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
         super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
         this.model = model;
-        this.context = new HashMap<>(100);
-        this.enableDatabaseSession = enableDatabaseSession;
+        // Due to the cache would be updated depending on final storage implementation,
+        // the map/cache could be updated concurrently.
+        // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
+        // Since 9.3.0
+        this.sessionCache = new ConcurrentHashMap<>(100);
         this.metricsDAO = metricsDAO;
         this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
         this.nextExportWorker = Optional.ofNullable(nextExportWorker);
@@ -140,14 +156,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder,
                             Model model,
                             IMetricsDAO metricsDAO,
-                            boolean enableDatabaseSession,
                             boolean supportUpdate,
                             long storageSessionTimeout,
                             int metricsDataTTL,
                             MetricStreamKind kind) {
         this(moduleDefineHolder, model, metricsDAO,
              null, null, null,
-             enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
+             supportUpdate, storageSessionTimeout, metricsDataTTL, kind
         );
         // For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes.
         // And add offset according to worker creation sequence, to avoid context clear overlap,
@@ -192,12 +207,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
             metricsList.add(data);
 
             if (metricsList.size() == batchSize) {
-                flushDataToStorage(metricsList, prepareRequests);
+                prepareFlushDataToStorage(metricsList, prepareRequests);
             }
         }
 
         if (metricsList.size() > 0) {
-            flushDataToStorage(metricsList, prepareRequests);
+            prepareFlushDataToStorage(metricsList, prepareRequests);
         }
 
         if (prepareRequests.size() > 0) {
@@ -209,14 +224,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         return prepareRequests;
     }
 
-    private void flushDataToStorage(List<Metrics> metricsList,
-                                    List<PrepareRequest> prepareRequests) {
+    /**
+     * Build given prepareRequests to prepare database flush
+     *
+     * @param metricsList     the metrics in the last read from the in-memory aggregated cache.
+     * @param prepareRequests the results for final execution.
+     */
+    private void prepareFlushDataToStorage(List<Metrics> metricsList,
+                                           List<PrepareRequest> prepareRequests) {
         try {
             loadFromStorage(metricsList);
 
             long timestamp = System.currentTimeMillis();
             for (Metrics metrics : metricsList) {
-                Metrics cachedMetrics = context.get(metrics);
+                Metrics cachedMetrics = sessionCache.get(metrics);
                 if (cachedMetrics != null) {
                     /*
                      * If the metrics is not supportUpdate, defined through MetricsExtension#supportUpdate,
@@ -233,12 +254,22 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
                         continue;
                     }
                     cachedMetrics.calculate();
-                    prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics));
+                    prepareRequests.add(
+                        metricsDAO.prepareBatchUpdate(
+                            model,
+                            cachedMetrics,
+                            new SessionCacheCallback(sessionCache, cachedMetrics)
+                        ));
                     nextWorker(cachedMetrics);
                     cachedMetrics.setLastUpdateTimestamp(timestamp);
                 } else {
                     metrics.calculate();
-                    prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics));
+                    prepareRequests.add(
+                        metricsDAO.prepareBatchInsert(
+                            model,
+                            metrics,
+                            new SessionCacheCallback(sessionCache, metrics)
+                        ));
                     nextWorker(metrics);
                     metrics.setLastUpdateTimestamp(timestamp);
                 }
@@ -263,7 +294,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     }
 
     /**
-     * Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist.
+     * Load data from the storage, only load data when the id doesn't exist.
      */
     private void loadFromStorage(List<Metrics> metrics) {
         final long currentTimeMillis = System.currentTimeMillis();
@@ -271,9 +302,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
             List<Metrics> notInCacheMetrics =
                 metrics.stream()
                        .filter(m -> {
-                           final Metrics cachedValue = context.get(m);
-                           // Not cached or session disabled, the metric could be tagged `not in cache`.
-                           if (cachedValue == null || !enableDatabaseSession) {
+                           final Metrics cachedValue = sessionCache.get(m);
+                           // the metric is tagged `not in cache`.
+                           if (cachedValue == null) {
                                return true;
                            }
                            // The metric is in the cache, but still we have to check
@@ -286,7 +317,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
 
                                if (metricsDAO.isExpiredCache(model, cachedValue, currentTimeMillis, metricsDataTTL)) {
                                    // The expired metrics should be removed from the context and tagged `not in cache` directly.
-                                   context.remove(m);
+                                   sessionCache.remove(m);
                                    return true;
                                }
                            }
@@ -298,12 +329,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
                 return;
             }
 
-            final List<Metrics> dbMetrics = metricsDAO.multiGet(model, notInCacheMetrics);
-            if (!enableDatabaseSession) {
-                // Clear the cache only after results from DB are returned successfully.
-                context.clear();
-            }
-            dbMetrics.forEach(m -> context.put(m, m));
+            metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> sessionCache.put(m, m));
         } catch (final Exception e) {
             log.error("Failed to load metrics for merging", e);
         }
@@ -311,15 +337,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
 
     @Override
     public void endOfRound() {
-        if (enableDatabaseSession) {
-            Iterator<Metrics> iterator = context.values().iterator();
-            long timestamp = System.currentTimeMillis();
-            while (iterator.hasNext()) {
-                Metrics metrics = iterator.next();
+        Iterator<Metrics> iterator = sessionCache.values().iterator();
+        long timestamp = System.currentTimeMillis();
+        while (iterator.hasNext()) {
+            Metrics metrics = iterator.next();
 
-                if (metrics.isExpired(timestamp, sessionTimeout)) {
-                    iterator.remove();
-                }
+            if (metrics.isExpired(timestamp, sessionTimeout)) {
+                iterator.remove();
             }
         }
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 7b849f88b1..de4c48a2c7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -76,12 +76,6 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
     @Setter
     @Getter
     private long l1FlushPeriod = 500;
-    /**
-     * Hold and forward CoreModuleConfig#enableDatabaseSession to the persistent worker.
-     */
-    @Setter
-    @Getter
-    private boolean enableDatabaseSession;
     /**
      * The threshold of session time. Unit is ms. Default value is 70s.
      */
@@ -219,7 +213,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
 
         MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
             moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker,
-            enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
+            supportUpdate, storageSessionTimeout, metricsDataTTL, kind
         );
         persistentWorkers.add(minutePersistentWorker);
 
@@ -233,7 +227,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
                                                        MetricStreamKind kind) {
         MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
             moduleDefineHolder, model, metricsDAO,
-            enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
+            supportUpdate, storageSessionTimeout, metricsDataTTL, kind
         );
         persistentWorkers.add(persistentWorker);
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 4320a83cd9..1baee65e4c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -47,7 +47,7 @@ public interface IMetricsDAO extends DAO {
      * @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
      * executed ASAP.
      */
-    InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
+    InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
 
     /**
      * Transfer the given metrics to an executable update statement.
@@ -55,7 +55,7 @@ public interface IMetricsDAO extends DAO {
      * @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
      * executed ASAP.
      */
-    UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
+    UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
 
     /**
      * Calculate the expired status of the metric by given current timestamp, metric and TTL.
@@ -72,4 +72,5 @@ public interface IMetricsDAO extends DAO {
         // If the cached metric is older than the TTL indicated.
         return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
     }
+
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
new file mode 100644
index 0000000000..e417b71529
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+
+/**
+ * SessionCacheCallback provides a bridge for storage implementations
+ */
+@RequiredArgsConstructor
+public class SessionCacheCallback {
+    private final Map<Metrics, Metrics> sessionCache;
+    private final Metrics metrics;
+    /**
+     * In some cases, this callback could be shared by multiple executions, such as SQLExecutor#additionalSQLs.
+     * This flag would make sure, once one of the generated executions is failure, the whole metric would be removed
+     * from the cache, and would not be added back. As those are executed in a batch mode. The sequence is uncertain.
+     */
+    private volatile boolean isFailed = false;
+
+    public void onInsertCompleted() {
+        if (isFailed) {
+            return;
+        }
+        sessionCache.put(metrics, metrics);
+    }
+
+    public void onUpdateFailure() {
+        isFailed = true;
+        sessionCache.remove(metrics);
+    }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
index 3e2cbf7ad0..e6bd63e28f 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
 @Getter
 public class IndexRequestWrapper implements InsertRequest {
-    private final IndexRequest request;
+    protected IndexRequest request;
 
     public IndexRequestWrapper(String index, String type, String id,
                                Map<String, ?> source) {
@@ -35,4 +35,15 @@ public class IndexRequestWrapper implements InsertRequest {
                               .doc(source)
                               .build();
     }
+
+    /**
+     * Expose an empty constructor to lazy initialization.
+     */
+    protected IndexRequestWrapper() {
+
+    }
+
+    @Override
+    public void onInsertCompleted() {
+    }
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
index 3241aa84e4..dbd9c8c1d2 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
@@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 
 @Getter
 public class UpdateRequestWrapper implements UpdateRequest {
-    private final org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
+    protected org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
 
     public UpdateRequestWrapper(String index, String type, String id,
                                 Map<String, Object> source) {
@@ -34,4 +34,16 @@ public class UpdateRequestWrapper implements UpdateRequest {
                                                                                     .doc(source)
                                                                                     .build();
     }
+
+    /**
+     * Expose an empty constructor to lazy initialization.
+     */
+    protected UpdateRequestWrapper() {
+
+    }
+
+    @Override
+    public void onUpdateFailure() {
+
+    }
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
index a9dbdab1af..91ded6a487 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
@@ -18,4 +18,5 @@
 package org.apache.skywalking.oap.server.library.client.request;
 
 public interface InsertRequest extends PrepareRequest {
+    void onInsertCompleted();
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
index ea914913c9..2895216971 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
@@ -18,4 +18,5 @@
 package org.apache.skywalking.oap.server.library.client.request;
 
 public interface UpdateRequest extends PrepareRequest {
+    void onUpdateFailure();
 }
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 009247151d..b4e507dc30 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -102,9 +102,6 @@ core:
     storageSessionTimeout: ${SW_CORE_STORAGE_SESSION_TIMEOUT:70000}
     # The period of doing data persistence. Unit is second.Default value is 25s
     persistentPeriod: ${SW_CORE_PERSISTENT_PERIOD:25}
-    # Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
-    # the metrics may not be accurate within that minute.
-    enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
     topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute
     # Extra model column are the column defined by in the codes, These columns of model are not required logically in aggregation or further query,
     # and it will cause more load for memory, network of OAP and storage.
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index 9564de27da..1e08880d22 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -18,6 +18,9 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
@@ -29,10 +32,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
 
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-
 public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
     private static final Object STREAM_SYNCHRONIZER = new Object();
     private static final Object MEASURE_SYNCHRONIZER = new Object();
@@ -69,7 +68,13 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
                 if (r instanceof BanyanDBStreamInsertRequest) {
                     return getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
                 } else if (r instanceof BanyanDBMeasureInsertRequest) {
-                    return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
+                    return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite())
+                                                         .whenComplete((v, throwable) -> {
+                                                             if (throwable == null) {
+                                                                 // Insert completed
+                                                                 ((BanyanDBMeasureInsertRequest) r).onInsertCompleted();
+                                                             }
+                                                         });
                 } else if (r instanceof BanyanDBMeasureUpdateRequest) {
                     return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
                 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
index 3f965bff6f..bbfe883372 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
@@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
 @RequiredArgsConstructor
 @Getter
 public class BanyanDBMeasureInsertRequest implements InsertRequest {
     private final MeasureWrite measureWrite;
+    private final SessionCacheCallback callback;
+
+    @Override
+    public void onInsertCompleted() {
+        callback.onInsertCompleted();
+    }
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
index 2ff1fae015..763537a034 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
@@ -21,10 +21,18 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 
 @RequiredArgsConstructor
 @Getter
 public class BanyanDBMeasureUpdateRequest implements UpdateRequest {
     private final MeasureWrite measureWrite;
+
+    @Override
+    public void onUpdateFailure() {
+        // BanyanDB measure update is equivalent to insert.
+        // If something goes wrong, then it is a code bug or server-side is not available
+        throw new UnexpectedException("Should not report onUpdateFailure when measure update.");
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 0b24eb0e29..dbef1af363 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -77,7 +78,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
     }
 
     @Override
-    public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+    public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
         log.info("prepare to insert {}", model.getName());
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
         if (schema == null) {
@@ -89,11 +90,11 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
         final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
         storageBuilder.entity2Storage(metrics, toStorage);
         toStorage.acceptID(metrics.id());
-        return new BanyanDBMeasureInsertRequest(toStorage.obtain());
+        return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback);
     }
 
     @Override
-    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
         log.info("prepare to update {}", model.getName());
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
         if (schema == null) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
index b04e4422d5..15dbc52c53 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
@@ -27,4 +27,9 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 @Getter
 public class BanyanDBStreamInsertRequest implements InsertRequest {
     private final StreamWrite streamWrite;
+
+    @Override
+    public void onInsertCompleted() {
+
+    }
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index dc6c47bb19..76e44d67fd 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -75,9 +75,21 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
             return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
                 if (prepareRequest instanceof InsertRequest) {
-                    return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest())
+                        .whenComplete((v, throwable) -> {
+                            if (throwable == null) {
+                                // Insert completed
+                                ((IndexRequestWrapper) prepareRequest).onInsertCompleted();
+                            }
+                        });
                 } else {
-                    return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
+                    return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest())
+                        .whenComplete((v, throwable) -> {
+                            if (throwable != null) {
+                                // Update failure
+                                ((UpdateRequestWrapper) prepareRequest).onUpdateFailure();
+                            }
+                        });
                 }
             }).toArray(CompletableFuture[]::new));
         }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
similarity index 50%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
index 3e2cbf7ad0..ce5cc6ec91 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
@@ -13,26 +13,29 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexRequestWrapper;
 
-import java.util.Map;
-import lombok.Getter;
-import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+/**
+ * MetricIndexRequestWrapper wraps the built request wrapper with a new callback.
+ */
+public class MetricIndexRequestWrapper extends IndexRequestWrapper {
+    private final SessionCacheCallback callback;
 
-@Getter
-public class IndexRequestWrapper implements InsertRequest {
-    private final IndexRequest request;
+    public MetricIndexRequestWrapper(IndexRequestWrapper requestWrapper, SessionCacheCallback callback) {
+        this.request = requestWrapper.getRequest();
+        this.callback = callback;
+    }
 
-    public IndexRequestWrapper(String index, String type, String id,
-                               Map<String, ?> source) {
-        request = IndexRequest.builder()
-                              .index(index)
-                              .type(type)
-                              .id(id)
-                              .doc(source)
-                              .build();
+    @Override
+    public void onInsertCompleted() {
+        if (callback != null) {
+            callback.onInsertCompleted();
+        }
     }
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
similarity index 50%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
index 3e2cbf7ad0..34216534b9 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
@@ -13,26 +13,29 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.UpdateRequestWrapper;
 
-import java.util.Map;
-import lombok.Getter;
-import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+/**
+ * MetricIndexUpdateWrapper wraps the built request wrapper with a new callback.
+ */
+public class MetricIndexUpdateWrapper extends UpdateRequestWrapper {
+    private final SessionCacheCallback callback;
 
-@Getter
-public class IndexRequestWrapper implements InsertRequest {
-    private final IndexRequest request;
+    public MetricIndexUpdateWrapper(UpdateRequestWrapper requestWrapper, SessionCacheCallback callback) {
+        this.request = requestWrapper.getRequest();
+        this.callback = callback;
+    }
 
-    public IndexRequestWrapper(String index, String type, String id,
-                               Map<String, ?> source) {
-        request = IndexRequest.builder()
-                              .index(index)
-                              .type(type)
-                              .id(id)
-                              .doc(source)
-                              .build();
+    @Override
+    public void onUpdateFailure() {
+        if (callback != null) {
+            callback.onUpdateFailure();
+        }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 0363669475..12319dae1a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -32,6 +32,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -97,24 +98,24 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
     }
 
     @Override
-    public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
+    public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) {
         final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
         String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
         String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
-        return getClient().prepareInsert(modelName, id, builder);
+        return new MetricIndexRequestWrapper(getClient().prepareInsert(modelName, id, builder), callback);
     }
 
     @Override
-    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
+    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) {
         final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> builder =
             IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
         String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
         String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
-        return getClient().prepareUpdate(modelName, id, builder);
+        return new MetricIndexUpdateWrapper(getClient().prepareUpdate(modelName, id, builder), callback);
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
index a3d4848463..4061ae7405 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
@@ -18,15 +18,17 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.jdbc;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.List;
 
 /**
  * A Batch SQL executor.
@@ -45,32 +47,59 @@ public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
             return;
         }
         String sql = prepareRequests.get(0).toString();
+        List<PrepareRequest> bulkRequest = new ArrayList<>(maxBatchSqlSize);
         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
             int pendingCount = 0;
             for (int k = 0; k < prepareRequests.size(); k++) {
                 SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
                 sqlExecutor.setParameters(preparedStatement);
                 preparedStatement.addBatch();
+                bulkRequest.add(sqlExecutor);
                 if (k > 0 && k % maxBatchSqlSize == 0) {
-                    executeBatch(preparedStatement, maxBatchSqlSize, sql);
+                    executeBatch(preparedStatement, maxBatchSqlSize, sql, bulkRequest);
+                    bulkRequest.clear();
                     pendingCount = 0;
                 } else {
                     pendingCount++;
                 }
             }
             if (pendingCount > 0) {
-                executeBatch(preparedStatement, pendingCount, sql);
+                executeBatch(preparedStatement, pendingCount, sql, bulkRequest);
+                bulkRequest.clear();
             }
         }
     }
 
-    private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
+    private void executeBatch(PreparedStatement preparedStatement,
+                              int pendingCount,
+                              String sql,
+                              List<PrepareRequest> bulkRequest) throws SQLException {
         long start = System.currentTimeMillis();
-        preparedStatement.executeBatch();
+        final int[] executeBatchResults = preparedStatement.executeBatch();
+        boolean isInsert = bulkRequest.get(0) instanceof InsertRequest;
+        for (int i = 0; i < executeBatchResults.length; i++) {
+            if (executeBatchResults[i] == 1 && isInsert) {
+                // Insert successfully.
+                ((InsertRequest) bulkRequest.get(i)).onInsertCompleted();
+            } else if (executeBatchResults[i] == 0 && !isInsert) {
+                // Update Failure.
+                ((UpdateRequest) bulkRequest.get(i)).onUpdateFailure();
+            }
+        }
         if (log.isDebugEnabled()) {
             long end = System.currentTimeMillis();
             long cost = end - start;
             log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
         }
     }
+
+    @Override
+    public void onInsertCompleted() {
+        throw new UnexpectedException("BatchSQLExecutor.onInsertCompleted should not be called");
+    }
+
+    @Override
+    public void onUpdateFailure() {
+        throw new UnexpectedException("BatchSQLExecutor.onUpdateFailure should not be called");
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
index 26c19ee276..266350eb45 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
@@ -25,34 +25,30 @@ import java.util.ArrayList;
 import java.util.List;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A SQL executor.
  */
 @EqualsAndHashCode(of = "sql")
+@RequiredArgsConstructor
+@Slf4j
 public class SQLExecutor implements InsertRequest, UpdateRequest {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class);
-
-    private String sql;
-    private List<Object> param;
+    private final String sql;
+    private final List<Object> param;
+    private final SessionCacheCallback callback;
     @Getter
     private List<SQLExecutor> additionalSQLs;
 
-    public SQLExecutor(String sql, List<Object> param) {
-        this.sql = sql;
-        this.param = param;
-    }
-
     public void invoke(Connection connection) throws SQLException {
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         setParameters(preparedStatement);
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql in batch: {}, parameters: {}", sql, param);
         }
         preparedStatement.execute();
         if (additionalSQLs != null) {
@@ -79,4 +75,16 @@ public class SQLExecutor implements InsertRequest, UpdateRequest {
         }
         additionalSQLs.addAll(sqlExecutors);
     }
+
+    @Override
+    public void onInsertCompleted() {
+        if (callback != null)
+            callback.onInsertCompleted();
+    }
+
+    @Override
+    public void onUpdateFailure() {
+        if (callback != null)
+            callback.onUpdateFailure();
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
index 50becff534..2aea8f6ef3 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
@@ -48,7 +48,7 @@ public class JDBCManagementDAO extends JDBCSQLExecutor implements IManagementDAO
             }
 
             SQLExecutor insertExecutor = getInsertExecutor(model.getName(), storageData, storageBuilder,
-                                                           new HashMapConverter.ToStorage());
+                                                           new HashMapConverter.ToStorage(), null);
             insertExecutor.invoke(connection);
         } catch (IOException | SQLException e) {
             throw new IOException(e.getMessage(), e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
index 3294d80720..5170362df7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
@@ -49,12 +50,12 @@ public class JDBCMetricsDAO extends JDBCSQLExecutor implements IMetricsDAO {
     }
 
     @Override
-    public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
-        return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage());
+    public SQLExecutor prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
+        return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage(), callback);
     }
 
     @Override
-    public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
-        return getUpdateExecutor(model.getName(), metrics, storageBuilder);
+    public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
+        return getUpdateExecutor(model.getName(), metrics, storageBuilder, callback);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
index aaf903b76b..7a87059e08 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
@@ -41,7 +41,7 @@ public class JDBCNoneStreamDAO extends JDBCSQLExecutor implements INoneStreamDAO
     @Override
     public void insert(Model model, NoneStream noneStream) throws IOException {
         try (Connection connection = jdbcClient.getConnection()) {
-            SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage());
+            SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage(), null);
             insertExecutor.invoke(connection);
         } catch (IOException | SQLException e) {
             throw new IOException(e.getMessage(), e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
index 1ca36a69fe..f382f746e1 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
@@ -35,6 +35,6 @@ public class JDBCRecordDAO extends JDBCSQLExecutor implements IRecordDAO {
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-        return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage());
+        return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage(), null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
index 12321e3629..c15f4768ae 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
@@ -116,7 +117,8 @@ public class JDBCSQLExecutor {
 
     protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
                                                                     StorageBuilder<T> storageBuilder,
-                                                                    Convert2Storage<Map<String, Object>> converter) throws IOException {
+                                                                    Convert2Storage<Map<String, Object>> converter,
+                                                                    SessionCacheCallback callback) throws IOException {
         Model model = TableMetaInfo.get(modelName);
         storageBuilder.entity2Storage(metrics, converter);
         Map<String, Object> objectMap = converter.obtain();
@@ -126,7 +128,7 @@ public class JDBCSQLExecutor {
             mainEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName()));
         });
         SQLExecutor sqlExecutor = buildInsertExecutor(
-            modelName, model.getColumns(), metrics, mainEntity);
+            modelName, model.getColumns(), metrics, mainEntity, callback);
         //build additional table sql
         for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
                                                                               .getAdditionalTables()
@@ -137,7 +139,7 @@ public class JDBCSQLExecutor {
             });
 
             List<SQLExecutor> additionalSQLExecutors = buildAdditionalInsertExecutor(
-                additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity
+                additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity, callback
             );
             sqlExecutor.appendAdditionalSQLs(additionalSQLExecutors);
         }
@@ -147,7 +149,8 @@ public class JDBCSQLExecutor {
     private <T extends StorageData> SQLExecutor buildInsertExecutor(String tableName,
                                                                     List<ModelColumn> columns,
                                                                     T metrics,
-                                                                    Map<String, Object> objectMap) throws IOException {
+                                                                    Map<String, Object> objectMap,
+                                                                    SessionCacheCallback onCompleteCallback) throws IOException {
         SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
         List<Object> param = new ArrayList<>();
         sqlBuilder.append("(?,");
@@ -169,13 +172,14 @@ public class JDBCSQLExecutor {
         }
         sqlBuilder.append(")");
 
-        return new SQLExecutor(sqlBuilder.toString(), param);
+        return new SQLExecutor(sqlBuilder.toString(), param, onCompleteCallback);
     }
 
     private <T extends StorageData> List<SQLExecutor> buildAdditionalInsertExecutor(String tableName,
                                                                                     List<ModelColumn> columns,
                                                                                     T metrics,
-                                                                                    Map<String, Object> objectMap) throws IOException {
+                                                                                    Map<String, Object> objectMap,
+                                                                                    SessionCacheCallback callback) throws IOException {
 
         List<SQLExecutor> sqlExecutors = new ArrayList<>();
         SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
@@ -211,17 +215,18 @@ public class JDBCSQLExecutor {
             for (Object object : valueList) {
                 List<Object> paramCopy = new ArrayList<>(param);
                 paramCopy.set(position, object);
-                sqlExecutors.add(new SQLExecutor(sql, paramCopy));
+                sqlExecutors.add(new SQLExecutor(sql, paramCopy, callback));
             }
         } else {
-            sqlExecutors.add(new SQLExecutor(sql, param));
+            sqlExecutors.add(new SQLExecutor(sql, param, callback));
         }
 
         return sqlExecutors;
     }
 
     protected <T extends StorageData> SQLExecutor getUpdateExecutor(String modelName, T metrics,
-                                                                    StorageBuilder<T> storageBuilder) throws IOException {
+                                                                    StorageBuilder<T> storageBuilder,
+                                                                    SessionCacheCallback callback) throws IOException {
         final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> objectMap = toStorage.obtain();
@@ -236,7 +241,8 @@ public class JDBCSQLExecutor {
             if (model.getSqlDBModelExtension().isShardingTable()) {
                 SQLDatabaseModelExtension.Sharding sharding = model.getSqlDBModelExtension().getSharding().orElseThrow(
                     () -> new UnexpectedException("Sharding should not be empty."));
-                if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(sharding.getTableShardingColumn())) {
+                if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(
+                    sharding.getTableShardingColumn())) {
                     continue;
                 }
             }
@@ -253,6 +259,6 @@ public class JDBCSQLExecutor {
         sqlBuilder.append(" WHERE id = ?");
         param.add(metrics.id());
 
-        return new SQLExecutor(sqlBuilder.toString(), param);
+        return new SQLExecutor(sqlBuilder.toString(), param, callback);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
index 7097aad725..9267efd21c 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
@@ -102,7 +102,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
     public TemplateChangeStatus addTemplate(final DashboardSetting setting) throws IOException {
         final UITemplate uiTemplate = setting.toEntity();
         final SQLExecutor insertExecutor = getInsertExecutor(
-            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage());
+            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage(), null);
         try (Connection connection = h2Client.getConnection()) {
             insertExecutor.invoke(connection);
             return TemplateChangeStatus.builder().status(true).id(setting.getId()).build();
@@ -135,7 +135,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
 
     private TemplateChangeStatus executeUpdate(final UITemplate uiTemplate) throws IOException {
         final SQLExecutor updateExecutor = getUpdateExecutor(
-            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder());
+            UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), null);
         try (Connection connection = h2Client.getConnection()) {
             updateExecutor.invoke(connection);
             return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build();
diff --git a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
index c7ac504256..c9776758ec 100644
--- a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
+++ b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
@@ -642,7 +642,7 @@ public class ShardingIntegrationTest {
                                                                                     .builder()
                                                                                     .getDeclaredConstructor()
                                                                                     .newInstance());
-                jdbcMetricsDAO.prepareBatchInsert(model, metrics).invoke(conn);
+                jdbcMetricsDAO.prepareBatchInsert(model, metrics, null).invoke(conn);
             }
         }
     }