You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/07/24 05:43:28 UTC

[skywalking] branch master updated: Feature of database session in OAP server. (#3147)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d234a9  Feature of database session in OAP server. (#3147)
8d234a9 is described below

commit 8d234a91e55f4ddaed46ed885a81443f9f2fd0ce
Author: 彭勇升 pengys <pe...@apache.org>
AuthorDate: Wed Jul 24 13:43:19 2019 +0800

    Feature of database session in OAP server. (#3147)
    
    * Feature of database session
    
    * Make it configurable.
    
    * Make the OAP server can't startup.
---
 .../oap/server/core/CoreModuleConfig.java          |  1 +
 .../oap/server/core/CoreModuleProvider.java        | 75 +++++---------------
 .../oap/server/core/analysis/metrics/Metrics.java  |  1 +
 .../server/core/analysis/metrics/PxxMetrics.java   |  2 +-
 .../analysis/worker/MetricsPersistentWorker.java   | 81 ++++++++++++++++------
 .../analysis/worker/MetricsStreamProcessor.java    | 10 +--
 .../core/analysis/worker/PersistenceWorker.java    | 15 ++--
 .../server/core/analysis/worker/TopNWorker.java    | 12 +++-
 .../oap/server/core/storage/IMetricsDAO.java       |  4 +-
 .../oap/server/core/storage/PersistenceTimer.java  | 11 ++-
 .../src/main/assembly/application.yml              |  9 ++-
 .../src/main/resources/application.yml             |  3 +
 .../plugin/elasticsearch/base/MetricsEsDAO.java    | 13 ++--
 .../storage/plugin/jdbc/h2/dao/H2MetricsDAO.java   | 15 ++--
 14 files changed, 133 insertions(+), 119 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 7d4c3ab..e47171a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -37,6 +37,7 @@ public class CoreModuleConfig extends ModuleConfig {
     @Setter private int gRPCPort;
     @Setter private int maxConcurrentCallsPerConnection;
     @Setter private int maxMessageSize;
+    @Setter private boolean enableDatabaseSession;
     private final List<String> downsampling;
     /**
      * The period of doing data persistence.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index f05b776..fa6c48a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,65 +20,25 @@ package org.apache.skywalking.oap.server.core;
 
 import java.io.IOException;
 import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
-import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
+import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
-import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
-import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
-import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
-import org.apache.skywalking.oap.server.core.query.LogQueryService;
-import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
-import org.apache.skywalking.oap.server.core.query.MetricQueryService;
-import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
-import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
-import org.apache.skywalking.oap.server.core.query.TraceQueryService;
-import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
-import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import org.apache.skywalking.oap.server.core.remote.client.Address;
-import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.oal.rt.*;
+import org.apache.skywalking.oap.server.core.query.*;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.client.*;
 import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
-import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
-import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
-import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
-import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
-import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
@@ -96,7 +56,6 @@ public class CoreModuleProvider extends ModuleProvider {
     private final AnnotationScan annotationScan;
     private final StorageModels storageModels;
     private final SourceReceiverImpl receiver;
-    private StreamAnnotationListener streamAnnotationListener;
     private OALEngine oalEngine;
 
     public CoreModuleProvider() {
@@ -120,7 +79,7 @@ public class CoreModuleProvider extends ModuleProvider {
     }
 
     @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
-        streamAnnotationListener = new StreamAnnotationListener(getManager());
+        StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(getManager());
 
         AnnotationScan scopeScan = new AnnotationScan();
         scopeScan.registerListener(new DefaultScopeDefine.Listener());
@@ -200,6 +159,8 @@ public class CoreModuleProvider extends ModuleProvider {
 
         this.remoteClientManager = new RemoteClientManager(getManager());
         this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
+
+        MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
     }
 
     @Override public void start() throws ModuleStartException {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
index 96b59da..25c3125 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
@@ -35,6 +35,7 @@ public abstract class Metrics extends StreamData implements StorageData {
     public static final String ENTITY_ID = "entity_id";
 
     @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
+    @Getter @Setter private long survivalTime = 0L;
 
     public abstract String id();
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java
index 1cc88c2..3754c62 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java
@@ -43,7 +43,7 @@ public abstract class PxxMetrics extends Metrics implements IntValueHolder {
     @Getter @Setter @Column(columnName = DETAIL_GROUP) private IntKeyLongValueArray detailGroup;
 
     private final int percentileRank;
-    private Map<Integer, IntKeyLongValue> detailIndex;
+    @Getter private Map<Integer, IntKeyLongValue> detailIndex;
 
     public PxxMetrics(int percentileRank) {
         this.percentileRank = percentileRank;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 34e6436..8c2865a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.io.IOException;
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
@@ -40,16 +41,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
     private static final Logger logger = LoggerFactory.getLogger(MetricsPersistentWorker.class);
 
     private final Model model;
+    private final Map<Metrics, Metrics> databaseSession;
     private final MergeDataCache<Metrics> mergeDataCache;
     private final IMetricsDAO metricsDAO;
     private final AbstractWorker<Metrics> nextAlarmWorker;
     private final AbstractWorker<ExportEvent> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
+    private final boolean enableDatabaseSession;
 
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
-        AbstractWorker<ExportEvent> nextExportWorker) {
+        AbstractWorker<ExportEvent> nextExportWorker, boolean enableDatabaseSession) {
         super(moduleDefineHolder);
         this.model = model;
+        this.databaseSession = new HashMap<>(100);
+        this.enableDatabaseSession = enableDatabaseSession;
         this.mergeDataCache = new MergeDataCache<>();
         this.metricsDAO = metricsDAO;
         this.nextAlarmWorker = nextAlarmWorker;
@@ -83,23 +88,21 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
         return mergeDataCache;
     }
 
-    @Override public void prepareBatch(MergeDataCache<Metrics> cache, List<PrepareRequest> prepareRequests) {
+    @Override public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
         long start = System.currentTimeMillis();
 
-        Collection<Metrics> collection = cache.getLast().collection();
-
         int i = 0;
+        int batchGetSize = 2000;
         Metrics[] metrics = null;
-        for (Metrics data : collection) {
+        for (Metrics data : lastCollection) {
             if (Objects.nonNull(nextExportWorker)) {
                 ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
                 nextExportWorker.in(event);
             }
 
-            int batchGetSize = 2000;
             int mod = i % batchGetSize;
             if (mod == 0) {
-                int residual = collection.size() - i;
+                int residual = lastCollection.size() - i;
                 if (residual >= batchGetSize) {
                     metrics = new Metrics[batchGetSize];
                 } else {
@@ -110,23 +113,18 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
 
             if (mod == metrics.length - 1) {
                 try {
-                    Map<String, Metrics> dbMetricsMap = metricsDAO.get(model, metrics);
+                    syncStorageToCache(metrics);
 
                     for (Metrics metric : metrics) {
-                        if (dbMetricsMap.containsKey(metric.id())) {
-                            metric.combine(dbMetricsMap.get(metric.id()));
-                            metric.calculate();
-                            prepareRequests.add(metricsDAO.prepareBatchUpdate(model, metric));
+                        Metrics cacheMetric = databaseSession.get(metric);
+                        if (cacheMetric != null) {
+                            cacheMetric.combine(metric);
+                            cacheMetric.calculate();
+                            prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cacheMetric));
+                            nextWorker(cacheMetric);
                         } else {
                             prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric));
-                        }
-
-                        if (Objects.nonNull(nextAlarmWorker)) {
-                            nextAlarmWorker.in(metric);
-                        }
-                        if (Objects.nonNull(nextExportWorker)) {
-                            ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
-                            nextExportWorker.in(event);
+                            nextWorker(metric);
                         }
                     }
                 } catch (Throwable t) {
@@ -142,6 +140,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
         }
     }
 
+    private void nextWorker(Metrics metric) {
+        if (Objects.nonNull(nextAlarmWorker)) {
+            nextAlarmWorker.in(metric);
+        }
+        if (Objects.nonNull(nextExportWorker)) {
+            ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
+            nextExportWorker.in(event);
+        }
+    }
+
     @Override public void cacheData(Metrics input) {
         mergeDataCache.writing();
         if (mergeDataCache.containsKey(input)) {
@@ -156,6 +164,39 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
         mergeDataCache.finishWriting();
     }
 
+    private void syncStorageToCache(Metrics[] metrics) throws IOException {
+        if (!enableDatabaseSession) {
+            databaseSession.clear();
+        }
+
+        List<String> notInCacheIds = new ArrayList<>();
+        for (Metrics metric : metrics) {
+            if (!databaseSession.containsKey(metric)) {
+                notInCacheIds.add(metric.id());
+            }
+        }
+
+        if (notInCacheIds.size() > 0) {
+            List<Metrics> metricsList = metricsDAO.multiGet(model, notInCacheIds);
+            for (Metrics metric : metricsList) {
+                databaseSession.put(metric, metric);
+            }
+        }
+    }
+
+    @Override public void endOfRound(long tookTime) {
+        if (enableDatabaseSession) {
+            Iterator<Metrics> iterator = databaseSession.values().iterator();
+            while (iterator.hasNext()) {
+                Metrics metrics = iterator.next();
+                metrics.setSurvivalTime(tookTime + metrics.getSurvivalTime());
+                if (metrics.getSurvivalTime() > 70000) {
+                    iterator.remove();
+                }
+            }
+        }
+    }
+
     private class PersistentConsumer implements IConsumer<Metrics> {
 
         private final MetricsPersistentWorker persistent;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index e81906e..4ee60ab 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -19,7 +19,7 @@
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
-import lombok.Getter;
+import lombok.*;
 import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -39,6 +39,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
 
     private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
     @Getter private List<MetricsPersistentWorker> persistentWorkers = new ArrayList<>();
+    @Setter @Getter private boolean enableDatabaseSession;
 
     public static MetricsStreamProcessor getInstance() {
         return PROCESSOR;
@@ -100,19 +101,18 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
         entryWorkers.put(metricsClass, aggregateWorker);
     }
 
-    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder,
-        IMetricsDAO metricsDAO, Model model) {
+    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
         AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
         ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
 
-        MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker);
+        MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, enableDatabaseSession);
         persistentWorkers.add(minutePersistentWorker);
 
         return minutePersistentWorker;
     }
 
     private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
-        MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null);
+        MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, enableDatabaseSession);
         persistentWorkers.add(persistentWorker);
 
         return persistentWorker;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 166eb9c..88c9a5b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -18,8 +18,8 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.data.Window;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
@@ -45,6 +45,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
 
     public abstract CACHE getCache();
 
+    public abstract void endOfRound(long tookTime);
+
     public boolean flushAndSwitch() {
         boolean isSwitch;
         try {
@@ -57,11 +59,12 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
         return isSwitch;
     }
 
-    public abstract void prepareBatch(CACHE cache, List<PrepareRequest> prepareRequests);
+    public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests);
 
     public final void buildBatchRequests(List<PrepareRequest> prepareRequests) {
         try {
-            while (getCache().getLast().isWriting()) {
+            SWCollection<INPUT> last = getCache().getLast();
+            while (last.isWriting()) {
                 try {
                     Thread.sleep(10);
                 } catch (InterruptedException e) {
@@ -69,8 +72,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
                 }
             }
 
-            if (getCache().getLast().collection() != null) {
-                prepareBatch(getCache(), prepareRequests);
+            if (last.collection() != null) {
+                prepareBatch(last.collection(), prepareRequests);
             }
         } finally {
             getCache().finishReadingLast();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 712e992..8ccde84 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.List;
+import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
@@ -86,8 +86,8 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
         return super.flushAndSwitch();
     }
 
-    @Override public void prepareBatch(LimitedSizeDataCache<TopN> cache, List<PrepareRequest> prepareRequests) {
-        cache.getLast().collection().forEach(record -> {
+    @Override public void prepareBatch(Collection<TopN> lastCollection, List<PrepareRequest> prepareRequests) {
+        lastCollection.forEach(record -> {
             try {
                 prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
             } catch (Throwable t) {
@@ -96,6 +96,12 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
         });
     }
 
+    /**
+     * This method used to clear the expired cache, but TopN is not following it.
+     */
+    @Override public void endOfRound(long tookTime) {
+    }
+
     @Override public void in(TopN n) {
         dataCarrier.produce(n);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index e67dfa8..f0ff7bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -19,7 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.List;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.request.*;
@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.library.client.request.*;
  */
 public interface IMetricsDAO extends DAO {
 
-    Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException;
+    List<Metrics> multiGet(Model model, List<String> ids) throws IOException;
 
     InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index cedd074..5a18716 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -43,6 +43,8 @@ public enum PersistenceTimer {
     private CounterMetrics errorCounter;
     private HistogramMetrics prepareLatency;
     private HistogramMetrics executeLatency;
+    private long lastTime = System.currentTimeMillis();
+    private final List<PrepareRequest> prepareRequests = new ArrayList<>(50000);
 
     PersistenceTimer() {
         this.debug = System.getProperty("debug") != null;
@@ -61,7 +63,7 @@ public enum PersistenceTimer {
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
 
         if (!isStarted) {
-            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+            Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
                 new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
                     t -> logger.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
 
@@ -75,10 +77,10 @@ public enum PersistenceTimer {
         }
 
         long startTime = System.currentTimeMillis();
+
         try {
             HistogramMetrics.Timer timer = prepareLatency.createTimer();
 
-            List<PrepareRequest> prepareRequests = new LinkedList<>();
             try {
                 List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
                 persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
@@ -92,6 +94,8 @@ public enum PersistenceTimer {
                     if (worker.flushAndSwitch()) {
                         worker.buildBatchRequests(prepareRequests);
                     }
+
+                    worker.endOfRound(System.currentTimeMillis() - lastTime);
                 });
 
                 if (debug) {
@@ -116,6 +120,9 @@ public enum PersistenceTimer {
             if (logger.isDebugEnabled()) {
                 logger.debug("Persistence data save finish");
             }
+
+            prepareRequests.clear();
+            lastTime = System.currentTimeMillis();
         }
 
         if (debug) {
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index bc4b745..e45666c 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -52,9 +52,9 @@ core:
     gRPCHost: ${SW_CORE_GRPC_HOST:0.0.0.0}
     gRPCPort: ${SW_CORE_GRPC_PORT:11800}
     downsampling:
-    - Hour
-    - Day
-    - Month
+      - Hour
+      - Day
+      - Month
     # Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
     enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
     recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
@@ -62,6 +62,9 @@ core:
     hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
     dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
     monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
+    # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
+    # the metrics may not be accurate within that minute.
+    enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
 storage:
 #  elasticsearch:
 #    nameSpace: ${SW_NAMESPACE:""}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index ec79843..30d2134 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -62,6 +62,9 @@ core:
     hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
     dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
     monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
+    # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
+    # the metrics may not be accurate within that minute.
+    enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
 storage:
   elasticsearch:
     nameSpace: ${SW_NAMESPACE:""}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index e0403c7..7061531 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -39,18 +39,13 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
         this.storageBuilder = storageBuilder;
     }
 
-    @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
-        Map<String, Metrics> result = new HashMap<>();
+    @Override public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
+        SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0]));
 
-        String[] ids = new String[metrics.length];
-        for (int i = 0; i < metrics.length; i++) {
-            ids[i] = metrics[i].id();
-        }
-
-        SearchResponse response = getClient().ids(model.getName(), ids);
+        List<Metrics> result = new ArrayList<>((int)response.getHits().totalHits);
         for (int i = 0; i < response.getHits().totalHits; i++) {
             Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap());
-            result.put(source.id(), source);
+            result.add(source);
         }
         return result;
     }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
index 7f18b4c..e071c51 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
@@ -39,18 +39,11 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO {
         this.storageBuilder = storageBuilder;
     }
 
-    @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
-        Map<String, Metrics> result = new HashMap<>();
-
-        String[] ids = new String[metrics.length];
-        for (int i = 0; i < metrics.length; i++) {
-            ids[i] = metrics[i].id();
-        }
-
-        List<StorageData> storageDataList = getByIDs(h2Client, model.getName(), ids, storageBuilder);
-
+    @Override public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
+        List<StorageData> storageDataList = getByIDs(h2Client, model.getName(), ids.toArray(new String[0]), storageBuilder);
+        List<Metrics> result = new ArrayList<>(storageDataList.size());
         for (StorageData storageData : storageDataList) {
-            result.put(storageData.id(), (Metrics)storageData);
+            result.add((Metrics)storageData);
         }
         return result;
     }