You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2019/02/12 15:13:11 UTC

[incubator-skywalking] branch master updated: Support Top sql (#2239)

This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 4babd6f  Support Top sql (#2239)
4babd6f is described below

commit 4babd6ff73ee32535f09eb31ecbda736e3052a32
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Feb 12 23:13:02 2019 +0800

    Support Top sql (#2239)
    
    * The prototype of topN sql worker.
    
    * Add scope and new manual dispatcher.
    
    * no message
    
    * Finish topN persistence codes. Not test yet. And query have not added.
    
    * Finish the top n database statement persistent.
    
    * Support different slow thresholds for different db types.
    
    * Fix wrong db default threshold
    
    * Finish new query protocol binding.
    
    * Provide query empty implementation and sync ui.
    
    * Finish all codes, hope it works :P
    I will run the tests tonight.
    
    * Fix
    
    * Fix
    
    * Fix a startup issue.
    
    * Add time bucket to persistent.
    
    * Fix wrong query result column name.
    
    * Fix Database query.
    
    * Fix checkstyle.
    
    * Fix wrong order logic, and add a test case to verify, to fix https://github.com/apache/incubator-skywalking/pull/2239#discussion_r255948249
---
 docker/config/application.yml                      |   1 +
 .../skywalking/oap/server/core/CoreModule.java     |   1 +
 .../oap/server/core/CoreModuleProvider.java        |   3 +
 .../server/core/analysis/DispatcherManager.java    |   4 +
 .../core/analysis/data/LimitedSizeDataCache.java   |  51 ++++++++
 .../analysis/data/LimitedSizeDataCollection.java   | 115 ++++++++++++++++++
 .../core/analysis/data/NonMergeDataCollection.java |   4 +-
 .../oap/server/core/analysis/data/Window.java      |  10 ++
 .../database/DatabaseStatementDispatcher.java      |  40 ++++++
 .../manual/database/TopNDatabaseStatement.java     |  82 +++++++++++++
 .../oap/server/core/analysis/topn/TopN.java        |  46 +++++++
 .../core/analysis/topn/annotation/TopNType.java}   |  12 +-
 .../topn/annotation/TopNTypeListener.java}         |  24 +++-
 .../server/core/analysis/worker/TopNProcess.java   |  66 ++++++++++
 .../server/core/analysis/worker/TopNWorker.java    | 134 +++++++++++++++++++++
 .../server/core/query/TopNRecordsQueryService.java |  51 ++++++++
 .../oap/server/core/query/entity/TopNRecord.java}  |  14 ++-
 .../{Scope.java => DatabaseSlowStatement.java}     |  29 +++--
 .../skywalking/oap/server/core/source/Scope.java   |   2 +-
 .../core/storage/ComparableStorageData.java}       |  11 +-
 .../oap/server/core/storage/PersistenceTimer.java  |   1 +
 .../oap/server/core/storage/StorageModule.java     |   3 +-
 .../core/storage/query/ITopNRecordsQueryDAO.java}  |  15 +--
 .../data/LimitedSizeDataCollectionTest.java        |  72 +++++++++++
 .../oap/query/graphql/GraphQLQueryProvider.java    |   2 +
 .../query/graphql/resolver/TopNRecordsQuery.java   |  59 +++++++++
 .../query/graphql/type/TopNRecordsCondition.java}  |  18 +--
 .../src/main/resources/query-protocol              |   2 +-
 ...eModuleConfig.java => DBLatencyThresholds.java} |  40 ++++--
 .../trace/provider/TraceModuleProvider.java        |  21 ++--
 .../trace/provider/TraceServiceModuleConfig.java   |   9 +-
 .../trace/provider/parser/SegmentParse.java        |  17 ++-
 .../trace/provider/parser/SegmentParseV2.java      |  15 ++-
 .../SpanListenerFactory.java => SpanTags.java}     |  11 +-
 .../parser/listener/SpanListenerFactory.java       |   3 +-
 .../listener/endpoint/MultiScopesSpanListener.java |  71 +++++++----
 .../listener/segment/SegmentSpanListener.java      |   3 +-
 .../service/ServiceMappingSpanListener.java        |   3 +-
 .../server/receiver/trace/mock/ServiceBMock.java   |   4 +-
 .../src/main/assembly/application.yml              |   1 +
 .../src/main/resources/application.yml             |   1 +
 .../StorageModuleElasticsearchProvider.java        |   1 +
 .../elasticsearch/query/TopNRecordsQueryEsDAO.java |  66 ++++++++++
 .../storage/plugin/jdbc/h2/H2StorageProvider.java  |   1 +
 .../plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java  |  78 ++++++++++++
 .../plugin/jdbc/mysql/MySQLStorageProvider.java    |   1 +
 skywalking-ui                                      |   2 +-
 47 files changed, 1095 insertions(+), 125 deletions(-)

diff --git a/docker/config/application.yml b/docker/config/application.yml
index 1a13b4c..f9b645f 100644
--- a/docker/config/application.yml
+++ b/docker/config/application.yml
@@ -75,6 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 service-mesh:
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 1baa250..8eb7302 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -65,6 +65,7 @@ public class CoreModule extends ModuleDefine {
         classes.add(MetadataQueryService.class);
         classes.add(AggregationQueryService.class);
         classes.add(AlarmQueryService.class);
+        classes.add(TopNRecordsQueryService.class);
     }
 
     private void addServerInterface(List<Class> classes) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 3eb387e..0106fe7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
 import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
+import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNTypeListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
 import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.cluster.*;
@@ -130,12 +131,14 @@ public class CoreModuleProvider extends ModuleProvider {
         this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
         this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
         this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
+        this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
 
         annotationScan.registerListener(storageAnnotationListener);
         annotationScan.registerListener(streamAnnotationListener);
         annotationScan.registerListener(new IndicatorTypeListener(getManager()));
         annotationScan.registerListener(new InventoryTypeListener(getManager()));
         annotationScan.registerListener(new RecordTypeListener(getManager()));
+        annotationScan.registerListener(new TopNTypeListener(getManager()));
 
         this.remoteClientManager = new RemoteClientManager(getManager());
         this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 4697792..61602a6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -47,6 +47,10 @@ public class DispatcherManager {
     }
 
     public void forward(Source source) {
+        if (source == null) {
+            return;
+        }
+
         for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
             dispatcher.dispatch(source);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
new file mode 100644
index 0000000..33cfe1d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import org.apache.skywalking.oap.server.core.storage.*;
+
+public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
+
+    private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
+    private final int limitSize;
+
+    public LimitedSizeDataCache(int limitSize) {
+        super(false);
+        this.limitSize = limitSize;
+        init();
+    }
+
+    @Override public SWCollection<STORAGE_DATA> collectionInstance() {
+        return new LimitedSizeDataCollection<>(limitSize);
+    }
+
+    public void add(STORAGE_DATA data) {
+        limitedSizeDataCollection.put(data);
+    }
+
+    @Override public void writing() {
+        limitedSizeDataCollection = getCurrentAndWriting();
+    }
+
+    @Override public void finishWriting() {
+        limitedSizeDataCollection.finishWriting();
+        limitedSizeDataCollection = null;
+    }
+}
+
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
new file mode 100644
index 0000000..12cd96c
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+
+public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
+
+    private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
+    private final int limitedSize;
+    private volatile boolean writing;
+    private volatile boolean reading;
+
+    LimitedSizeDataCollection(int limitedSize) {
+        this.data = new HashMap<>();
+        this.writing = false;
+        this.reading = false;
+        this.limitedSize = limitedSize;
+    }
+
+    public void finishWriting() {
+        writing = false;
+    }
+
+    @Override public void writing() {
+        writing = true;
+    }
+
+    @Override public boolean isWriting() {
+        return writing;
+    }
+
+    @Override public void finishReading() {
+        reading = false;
+    }
+
+    @Override public void reading() {
+        reading = true;
+    }
+
+    @Override public boolean isReading() {
+        return reading;
+    }
+
+    @Override public int size() {
+        return data.size();
+    }
+
+    @Override public void clear() {
+        data.clear();
+    }
+
+    @Override public boolean containsKey(STORAGE_DATA key) {
+        throw new UnsupportedOperationException("Limited size data collection doesn't support containsKey operation.");
+    }
+
+    @Override public STORAGE_DATA get(STORAGE_DATA key) {
+        throw new UnsupportedOperationException("Limited size data collection doesn't support get operation.");
+    }
+
+    @Override public void put(STORAGE_DATA value) {
+        LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
+        if (storageDataList == null) {
+            storageDataList = new LinkedList<>();
+            data.put(value, storageDataList);
+        }
+
+        if (storageDataList.size() < limitedSize) {
+            storageDataList.add(value);
+            return;
+        }
+
+        for (int i = 0; i < storageDataList.size(); i++) {
+            STORAGE_DATA storageData = storageDataList.get(i);
+            if (value.compareTo(storageData) <= 0) {
+                if (i == 0) {
+                    // input value is less than the smallest in top N list, ignore
+                } else {
+                    // Remove the smallest in top N list
+                    // add the current value into the right position
+                    storageDataList.add(i, value);
+                    storageDataList.removeFirst();
+                }
+                return;
+            }
+        }
+
+        // Add the value as biggest in top N list
+        storageDataList.addLast(value);
+        storageDataList.removeFirst();
+    }
+
+    @Override public Collection<STORAGE_DATA> collection() {
+        List<STORAGE_DATA> collection = new ArrayList<>();
+        data.values().forEach(e -> e.forEach(collection::add));
+        return collection;
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
index 1d7a53c..bba2fc0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
@@ -69,11 +69,11 @@ public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements
     }
 
     @Override public boolean containsKey(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("Close merge data collection not support containsKey operation.");
+        throw new UnsupportedOperationException("Non merge data collection doesn't support containsKey operation.");
     }
 
     @Override public STORAGE_DATA get(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("Close merge data collection not support get operation.");
+        throw new UnsupportedOperationException("Non merge data collection doesn't support get operation.");
     }
 
     @Override public void put(STORAGE_DATA value) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
index 6f9486d..ff2ca6d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
@@ -33,6 +33,16 @@ public abstract class Window<DATA> {
     private SWCollection<DATA> windowDataB;
 
     Window() {
+        this(true);
+    }
+
+    Window(boolean autoInit) {
+        if (autoInit) {
+            init();
+        }
+    }
+
+    protected void init() {
         this.windowDataA = collectionInstance();
         this.windowDataB = collectionInstance();
         this.pointer = windowDataA;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
new file mode 100644
index 0000000..f36cca5
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.database;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
+import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
+
+/**
+ * @author wusheng
+ */
+public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
+    @Override public void dispatch(DatabaseSlowStatement source) {
+        TopNDatabaseStatement statement = new TopNDatabaseStatement();
+        statement.setId(source.getId());
+        statement.setServiceId(source.getDatabaseServiceId());
+        statement.setLatency(source.getLatency());
+        statement.setStatement(source.getStatement());
+        statement.setTimeBucket(source.getTimeBucket());
+        statement.setTraceId(source.getTraceId());
+
+        TopNProcess.INSTANCE.in(statement);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
new file mode 100644
index 0000000..2456486
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.database;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNType;
+import org.apache.skywalking.oap.server.core.source.Scope;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+/**
+ * Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
+ *
+ * @author wusheng
+ */
+@TopNType
+@StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, source = Scope.DatabaseSlowStatement)
+public class TopNDatabaseStatement extends TopN {
+    public static final String INDEX_NAME = "top_n_database_statement";
+
+
+    @Setter private String id;
+
+    @Override public String id() {
+        return id;
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
+        return getServiceId() == statement.getServiceId();
+    }
+
+    @Override public int hashCode() {
+        return Objects.hash(getServiceId());
+    }
+
+    public static class Builder implements StorageBuilder<TopNDatabaseStatement> {
+
+        @Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) {
+            TopNDatabaseStatement statement = new TopNDatabaseStatement();
+            statement.setStatement((String)dbMap.get(STATEMENT));
+            statement.setTraceId((String)dbMap.get(TRACE_ID));
+            statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
+            statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
+            statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+            return statement;
+        }
+
+        @Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(STATEMENT, storageData.getStatement());
+            map.put(TRACE_ID, storageData.getTraceId());
+            map.put(LATENCY, storageData.getLatency());
+            map.put(SERVICE_ID, storageData.getServiceId());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            return map;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
new file mode 100644
index 0000000..fae8298
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.topn;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+/**
+ * TopN data.
+ *
+ * @author wusheng
+ */
+public abstract class TopN extends Record implements ComparableStorageData {
+    public static final String STATEMENT = "statement";
+    public static final String LATENCY = "latency";
+    public static final String TRACE_ID = "trace_id";
+    public static final String SERVICE_ID = "service_id";
+
+    @Getter @Setter @Column(columnName = STATEMENT) private String statement;
+    @Getter @Setter @Column(columnName = LATENCY) private long latency;
+    @Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
+    @Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId;
+
+    @Override public int compareTo(Object o) {
+        TopN target = (TopN)o;
+        return (int)(latency - target.latency);
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java
similarity index 74%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java
index fd8d09f..82731b6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java
@@ -16,13 +16,11 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
 
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import java.lang.annotation.*;
 
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface TopNType {
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java
similarity index 55%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java
index fd8d09f..d668cdd 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java
@@ -16,13 +16,29 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
 
+import java.lang.annotation.Annotation;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
+import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
- * @author peng-yongsheng
+ * @author wusheng
  */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+public class TopNTypeListener implements AnnotationListener {
+
+    private final ModuleManager moduleManager;
+
+    public TopNTypeListener(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+    }
+
+    @Override public Class<? extends Annotation> annotation() {
+        return TopNType.class;
+    }
+
+    @Override public void notify(Class aClass) {
+        TopNProcess.INSTANCE.create(moduleManager, aClass);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
new file mode 100644
index 0000000..059c46e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.database.TopNDatabaseStatement;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * TopN is a special process, which hold a certain size of windows,
+ * and cache all top N records, save to the persistence in low frequence.
+ *
+ * @author wusheng
+ */
+public enum TopNProcess {
+    INSTANCE;
+
+    @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
+    private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
+
+    public void create(ModuleManager moduleManager, Class<? extends TopN> topNClass) {
+        String modelName = StorageEntityAnnotationUtils.getModelName(topNClass);
+        Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topNClass);
+
+        StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
+        IRecordDAO recordDAO;
+        try {
+            recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new UnexpectedException("");
+        }
+
+        TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager,
+            50, recordDAO);
+        WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+        persistentWorkers.add(persistentWorker);
+        workers.put(topNClass, persistentWorker);
+    }
+
+    public void in(TopNDatabaseStatement statement) {
+        workers.get(statement.getClass()).in(statement);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
new file mode 100644
index 0000000..5d6304e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * Top N worker is a persistence worker, but no
+ *
+ * @author wusheng
+ */
+public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
+    private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
+    private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
+    private final IRecordDAO recordDAO;
+    private final String modelName;
+    private final DataCarrier<TopN> dataCarrier;
+    private long reportCycle;
+    private volatile long lastReportTimestamp;
+
+    public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
+        int topNSize,
+        IRecordDAO recordDAO) {
+        super(moduleManager, workerId, -1);
+        this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
+        this.recordDAO = recordDAO;
+        this.modelName = modelName;
+        this.dataCarrier = new DataCarrier<>(1, 10000);
+        this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
+        this.lastReportTimestamp = System.currentTimeMillis();
+        // Top N persistent only works per 10 minutes.
+        this.reportCycle = 10 * 60 * 1000L;
+    }
+
+    @Override void onWork(TopN data) {
+        limitedSizeDataCache.writing();
+        try {
+            limitedSizeDataCache.add(data);
+        } finally {
+            limitedSizeDataCache.finishWriting();
+        }
+    }
+
+    /**
+     * TopN is not following the batch size trigger mode. The memory cost of this worker is limited always.
+     *
+     * `onWork` method has been override, so this method would never be executed. No need to implement this method,
+     */
+    @Override public void cacheData(TopN data) {
+
+    }
+
+    @Override public LimitedSizeDataCache<TopN> getCache() {
+        return limitedSizeDataCache;
+    }
+
+    /**
+     * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
+     * time windows.
+     *
+     * Switch and persistent attempt happens based on reportCycle.
+     *
+     * @return
+     */
+    @Override public boolean flushAndSwitch() {
+        long now = System.currentTimeMillis();
+        if (now - lastReportTimestamp <= reportCycle) {
+            return false;
+        }
+        lastReportTimestamp = now;
+        return super.flushAndSwitch();
+    }
+
+    @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
+        List<Object> batchCollection = new LinkedList<>();
+        cache.getLast().collection().forEach(record -> {
+            try {
+                batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
+            }
+        });
+        return batchCollection;
+    }
+
+    @Override public void in(TopN n) {
+        dataCarrier.produce(n);
+    }
+
+    private class TopNConsumer implements IConsumer<TopN> {
+        @Override public void init() {
+
+        }
+
+        @Override public void consume(List<TopN> data) {
+            /**
+             * TopN is not following the batch size trigger mode.
+             * No need to implement this method, the memory size is limited always.
+             */
+            data.forEach(row -> onWork(row));
+        }
+
+        @Override public void onError(List<TopN> data, Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+
+        @Override public void onExit() {
+
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
new file mode 100644
index 0000000..3d07aa5
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author wusheng
+ */
+public class TopNRecordsQueryService implements Service {
+    private final ModuleManager moduleManager;
+    private ITopNRecordsQueryDAO topNRecordsQueryDAO;
+
+    public TopNRecordsQueryService(ModuleManager manager) {
+        this.moduleManager = manager;
+    }
+
+    private ITopNRecordsQueryDAO getTopNRecordsQueryDAO() {
+        if (topNRecordsQueryDAO == null) {
+            this.topNRecordsQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ITopNRecordsQueryDAO.class);
+        }
+        return topNRecordsQueryDAO;
+    }
+
+    public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+        int topN, Order order) throws IOException {
+        return getTopNRecordsQueryDAO().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java
similarity index 75%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java
index fd8d09f..e8baa5d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java
@@ -16,13 +16,17 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.query.entity;
 
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import lombok.*;
 
 /**
- * @author peng-yongsheng
+ * @author wusheng
  */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+@Setter
+@Getter
+public class TopNRecord {
+    private String statement;
+    private long latency;
+    private String traceId;
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
similarity index 58%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index d162f3e..254a5e7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -18,18 +18,27 @@
 
 package org.apache.skywalking.oap.server.core.source;
 
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
+
 /**
- * @author peng-yongsheng, wusheng
+ * @author wusheng
  */
-public enum Scope {
-    All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
-    ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
-    Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess;
+@SourceType
+public class DatabaseSlowStatement extends Source {
+    @Getter @Setter private String id;
+    @Getter @Setter private int databaseServiceId;
+    @Getter @Setter private String statement;
+    @Getter @Setter private long latency;
+    @Getter @Setter private String traceId;
+
+    @Override public Scope scope() {
+        return Scope.DatabaseSlowStatement;
+    }
 
-    public static Scope valueOf(int ordinal) {
-        if (ordinal < 0 || ordinal >= values().length) {
-            throw new IndexOutOfBoundsException("Invalid ordinal");
-        }
-        return values()[ordinal];
+    @Override public String getEntityId() {
+        return Const.EMPTY_STRING;
     }
+
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
index d162f3e..fd66299 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
@@ -24,7 +24,7 @@ package org.apache.skywalking.oap.server.core.source;
 public enum Scope {
     All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
     ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
-    Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess;
+    Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess, DatabaseSlowStatement;
 
     public static Scope valueOf(int ordinal) {
         if (ordinal < 0 || ordinal >= values().length) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java
similarity index 75%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java
index fd8d09f..8e1a639 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java
@@ -16,13 +16,12 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
-
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+package org.apache.skywalking.oap.server.core.storage;
 
 /**
- * @author peng-yongsheng
+ * Storage data with comparable capability.
+ *
+ * @author wusheng
  */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+public interface ComparableStorageData extends StorageData, Comparable {
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 367bd9e..9bdbfd1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -84,6 +84,7 @@ public enum PersistenceTimer {
                 List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
                 persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
                 persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
+                persistenceWorkers.addAll(TopNProcess.INSTANCE.getPersistentWorkers());
 
                 persistenceWorkers.forEach(worker -> {
                     if (logger.isDebugEnabled()) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 02a896e..6072912 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -39,6 +39,7 @@ public class StorageModule extends ModuleDefine {
             IHistoryDeleteDAO.class,
             IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
             IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
-            ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class};
+            ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class,
+            ITopNRecordsQueryDAO.class};
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
similarity index 64%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
index fd8d09f..4b20e53 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
@@ -16,13 +16,14 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.storage.query;
 
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.library.module.Service;
 
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+public interface ITopNRecordsQueryDAO extends Service {
+    List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+        int topN, Order order) throws IOException;
 }
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
new file mode 100644
index 0000000..852c884
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.Objects;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+import org.junit.*;
+
+/**
+ * @author wusheng
+ */
+public class LimitedSizeDataCollectionTest {
+    @Test
+    public void testPut() {
+        LimitedSizeDataCollection<MockStorageData> collection = new LimitedSizeDataCollection<>(5);
+        collection.put(new MockStorageData(1));
+        collection.put(new MockStorageData(3));
+        collection.put(new MockStorageData(5));
+        collection.put(new MockStorageData(7));
+        collection.put(new MockStorageData(9));
+
+        MockStorageData income = new MockStorageData(4);
+        collection.put(income);
+
+        int[] expected = new int[] {3, 4, 5, 7, 9};
+        int i = 0;
+        for (MockStorageData data : collection.collection()) {
+            Assert.assertEquals(expected[i++], data.latency);
+        }
+    }
+
+    private class MockStorageData implements ComparableStorageData {
+        private long latency;
+
+        public MockStorageData(long latency) {
+            this.latency = latency;
+        }
+
+        @Override public int compareTo(Object o) {
+            MockStorageData target = (MockStorageData)o;
+            return (int)(latency - target.latency);
+        }
+
+        @Override public String id() {
+            return null;
+        }
+
+        @Override public boolean equals(Object o) {
+            return true;
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(1);
+        }
+    }
+}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
index c834daf..d940141 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
@@ -66,6 +66,8 @@ public class GraphQLQueryProvider extends ModuleProvider {
             .resolvers(new AggregationQuery(getManager()))
             .file("query-protocol/alarm.graphqls")
             .resolvers(new AlarmQuery(getManager()))
+            .file("query-protocol/top-n-records.graphqls")
+            .resolvers(new TopNRecordsQuery(getManager()))
             .build()
             .makeExecutableSchema();
         this.graphQL = GraphQL.newGraphQL(schema).build();
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
new file mode 100644
index 0000000..6354521
--- /dev/null
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.query.graphql.resolver;
+
+import com.coxautodev.graphql.tools.GraphQLQueryResolver;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.query.graphql.type.TopNRecordsCondition;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.query.*;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author wusheng
+ */
+public class TopNRecordsQuery implements GraphQLQueryResolver {
+    private final ModuleManager moduleManager;
+    private TopNRecordsQueryService topNRecordsQueryService;
+
+    public TopNRecordsQuery(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+    }
+
+    private TopNRecordsQueryService getTopNRecordsQueryService() {
+        if (topNRecordsQueryService == null) {
+            this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME).provider().getService(TopNRecordsQueryService.class);
+        }
+        return topNRecordsQueryService;
+    }
+
+    public List<TopNRecord> getTopNRecords(TopNRecordsCondition condition) throws IOException {
+        long startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getStart());
+        long endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getEnd());
+
+        String metricName = condition.getMetricName();
+        Order order = condition.getOrder();
+        int topN = condition.getTopN();
+        int serviceId = condition.getServiceId();
+
+        return getTopNRecordsQueryService().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
similarity index 71%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
index fd8d09f..d698f1a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
@@ -16,13 +16,17 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.query.graphql.type;
 
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.query.entity.Order;
 
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+@Getter
+@Setter
+public class TopNRecordsCondition {
+    private int serviceId;
+    private String metricName;
+    private int topN;
+    private Order order;
+    private Duration duration;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 85b81e2..6f11e3b 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 85b81e2e34efb0b670d039154feca336c9203700
+Subproject commit 6f11e3b829bba4d3532477e968291cf657f0ac0b
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java
similarity index 50%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java
index 5a36953..b66c890 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java
@@ -18,20 +18,34 @@
 
 package org.apache.skywalking.oap.server.receiver.trace.provider;
 
-import lombok.*;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import java.util.*;
 
 /**
- * @author peng-yongsheng
+ * @author wusheng
  */
-public class TraceServiceModuleConfig extends ModuleConfig {
-    @Setter @Getter private String bufferPath;
-    @Setter @Getter private int bufferOffsetMaxFileSize;
-    @Setter @Getter private int bufferDataMaxFileSize;
-    @Setter @Getter private boolean bufferFileCleanWhenRestart;
-    /**
-     * The sample rate precision is 1/10000.
-     * 10000 means 100% sample in default.
-     */
-    @Setter @Getter private int sampleRate = 10000;
+public class DBLatencyThresholds {
+    private Map<String, Integer> thresholds;
+
+    DBLatencyThresholds(String config) {
+        thresholds = new HashMap<>();
+        String[] settings = config.split(",");
+        for (String setting : settings) {
+            String[] typeValue = setting.split(":");
+            if (typeValue.length == 2) {
+                thresholds.put(typeValue[0].toLowerCase(), Integer.parseInt(typeValue[1]));
+            }
+        }
+        if (!thresholds.containsKey("default")) {
+            thresholds.put("default", 10000);
+        }
+    }
+
+    public int getThreshold(String type) {
+        type = type.toLowerCase();
+        if (thresholds.containsKey(type)) {
+            return thresholds.get(type);
+        } else {
+            return thresholds.get("default");
+        }
+    }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index baa027f..367702d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -20,22 +20,13 @@ package org.apache.skywalking.oap.server.receiver.trace.provider;
 
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserServiceImpl;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener;
@@ -68,19 +59,21 @@ public class TraceModuleProvider extends ModuleProvider {
     }
 
     @Override public void prepare() throws ServiceNotProvidedException {
+        moduleConfig.setDbLatencyThresholds(new DBLatencyThresholds(moduleConfig.getSlowDBAccessThreshold()));
+
         SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
         listenerManager.add(new MultiScopesSpanListener.Factory());
         listenerManager.add(new ServiceMappingSpanListener.Factory());
         listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
 
-        segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+        segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig);
 
         listenerManager = new SegmentParserListenerManager();
         listenerManager.add(new MultiScopesSpanListener.Factory());
         listenerManager.add(new ServiceMappingSpanListener.Factory());
         listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
 
-        segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager);
+        segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig);
 
         this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
index 5a36953..8ebeec6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
@@ -30,8 +30,13 @@ public class TraceServiceModuleConfig extends ModuleConfig {
     @Setter @Getter private int bufferDataMaxFileSize;
     @Setter @Getter private boolean bufferFileCleanWhenRestart;
     /**
-     * The sample rate precision is 1/10000.
-     * 10000 means 100% sample in default.
+     * The sample rate precision is 1/10000. 10000 means 100% sample in default.
      */
     @Setter @Getter private int sampleRate = 10000;
+
+    /**
+     * The threshold used to check the slow database access. Unit, millisecond.
+     */
+    @Setter @Getter private String slowDBAccessThreshold = "default:200";
+    @Setter @Getter private DBLatencyThresholds dbLatencyThresholds;
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 0a4bd69..7abe9e5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.network.language.agent.*;
 import org.apache.skywalking.oap.server.library.buffer.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -43,12 +44,14 @@ public class SegmentParse {
     private final List<SpanListener> spanListeners;
     private final SegmentParserListenerManager listenerManager;
     private final SegmentCoreInfo segmentCoreInfo;
+    private final TraceServiceModuleConfig config;
     @Setter private SegmentStandardizationWorker standardizationWorker;
     private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
     private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
     private volatile static CounterMetric TRACE_PARSE_ERROR;
 
-    private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+    private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+        TraceServiceModuleConfig config) {
         this.moduleManager = moduleManager;
         this.listenerManager = listenerManager;
         this.spanListeners = new LinkedList<>();
@@ -56,6 +59,7 @@ public class SegmentParse {
         this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
         this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
         this.segmentCoreInfo.setV2(false);
+        this.config = config;
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         TRACE_BUFFER_FILE_RETRY = metricCreator.createCounter("v5_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.",
@@ -239,7 +243,7 @@ public class SegmentParse {
     }
 
     private void createSpanListeners() {
-        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
     }
 
     public enum Source {
@@ -251,20 +255,23 @@ public class SegmentParse {
         @Setter private SegmentStandardizationWorker standardizationWorker;
         private final ModuleManager moduleManager;
         private final SegmentParserListenerManager listenerManager;
+        private final TraceServiceModuleConfig config;
 
-        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+            TraceServiceModuleConfig config) {
             this.moduleManager = moduleManager;
             this.listenerManager = listenerManager;
+            this.config = config;
         }
 
         public void send(UpstreamSegment segment, Source source) {
-            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             segmentParse.parse(new BufferData<>(segment), source);
         }
 
         @Override public boolean call(BufferData<UpstreamSegment> bufferData) {
-            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             boolean parseResult = segmentParse.parse(bufferData, Source.Buffer);
             if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
index dd28ba4..543b0eb 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
 import org.apache.skywalking.oap.server.library.buffer.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -46,12 +47,13 @@ public class SegmentParseV2 {
     private final List<SpanListener> spanListeners;
     private final SegmentParserListenerManager listenerManager;
     private final SegmentCoreInfo segmentCoreInfo;
+    private final TraceServiceModuleConfig config;
     @Setter private SegmentStandardizationWorker standardizationWorker;
     private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
     private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
     private volatile static CounterMetric TRACE_PARSE_ERROR;
 
-    private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+    private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
         this.moduleManager = moduleManager;
         this.listenerManager = listenerManager;
         this.spanListeners = new LinkedList<>();
@@ -59,6 +61,7 @@ public class SegmentParseV2 {
         this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
         this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
         this.segmentCoreInfo.setV2(true);
+        this.config = config;
 
         if (TRACE_BUFFER_FILE_RETRY == null) {
             MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
@@ -245,7 +248,7 @@ public class SegmentParseV2 {
     }
 
     private void createSpanListeners() {
-        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
     }
 
     public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
@@ -253,20 +256,22 @@ public class SegmentParseV2 {
         @Setter private SegmentStandardizationWorker standardizationWorker;
         private final ModuleManager moduleManager;
         private final SegmentParserListenerManager listenerManager;
+        private final TraceServiceModuleConfig config;
 
-        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
             this.moduleManager = moduleManager;
             this.listenerManager = listenerManager;
+            this.config = config;
         }
 
         public void send(UpstreamSegment segment, SegmentSource source) {
-            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             segmentParse.parse(new BufferData<>(segment), source);
         }
 
         @Override public boolean call(BufferData<UpstreamSegment> bufferData) {
-            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer);
             if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
similarity index 80%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
index fd8d09f..0f9fef0 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
@@ -16,13 +16,10 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
 
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+public class SpanTags {
+    public static final String DB_STATEMENT = "db.statement";
 
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+    public static final String DB_TYPE = "db.type";
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
index fd8d09f..9b4afd6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
@@ -19,10 +19,11 @@
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
 
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 
 /**
  * @author peng-yongsheng
  */
 public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+    SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config);
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index 590d71e..0330e77 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -18,28 +18,19 @@
 
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
 import org.apache.skywalking.apm.network.language.agent.SpanLayer;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.EndpointRelation;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
+import org.slf4j.*;
 
 import static java.util.Objects.nonNull;
 
@@ -63,16 +54,20 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
 
     private final List<SourceBuilder> entrySourceBuilders;
     private final List<SourceBuilder> exitSourceBuilders;
+    private final List<DatabaseSlowStatement> slowDatabaseAccesses;
+    private final TraceServiceModuleConfig config;
     private SpanDecorator entrySpanDecorator;
     private long minuteTimeBucket;
 
-    private MultiScopesSpanListener(ModuleManager moduleManager) {
+    private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {
         this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
         this.entrySourceBuilders = new LinkedList<>();
         this.exitSourceBuilders = new LinkedList<>();
+        this.slowDatabaseAccesses = new ArrayList<>(10);
         this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
         this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
         this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
+        this.config = config;
     }
 
     @Override public boolean containsPoint(Point point) {
@@ -152,6 +147,34 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
         sourceBuilder.setComponentId(spanDecorator.getComponentId());
         setPublicAttrs(sourceBuilder, spanDecorator);
         exitSourceBuilders.add(sourceBuilder);
+
+        if (sourceBuilder.getType().equals(RequestType.DATABASE)) {
+            boolean isSlowDBAccess = false;
+
+            DatabaseSlowStatement statement = new DatabaseSlowStatement();
+            statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
+            statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
+            statement.setLatency(sourceBuilder.getLatency());
+            statement.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime()));
+            statement.setTraceId(segmentCoreInfo.getSegmentId());
+            for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
+                if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
+                    statement.setStatement(tag.getValue());
+
+                } else if (SpanTags.DB_TYPE.equals(tag.getKey())) {
+                    String dbType = tag.getValue();
+                    DBLatencyThresholds thresholds = config.getDbLatencyThresholds();
+                    int threshold = thresholds.getThreshold(dbType);
+                    if (sourceBuilder.getLatency() > threshold) {
+                        isSlowDBAccess = true;
+                    }
+                }
+            }
+
+            if (isSlowDBAccess) {
+                slowDatabaseAccesses.add(statement);
+            }
+        }
     }
 
     private void setPublicAttrs(SourceBuilder sourceBuilder, SpanDecorator spanDecorator) {
@@ -215,13 +238,15 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
                 sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());
             }
         });
+
+        slowDatabaseAccesses.forEach(sourceReceiver::receive);
     }
 
     public static class Factory implements SpanListenerFactory {
 
         @Override
-        public SpanListener create(ModuleManager moduleManager) {
-            return new MultiScopesSpanListener(moduleManager);
+        public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
+            return new MultiScopesSpanListener(moduleManager, config);
         }
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 9add272..644d2d2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
@@ -144,7 +145,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
             this.sampler = new TraceSegmentSampler(segmentSamplingRate);
         }
 
-        @Override public SpanListener create(ModuleManager moduleManager) {
+        @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
             return new SegmentSpanListener(moduleManager, sampler);
         }
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
index 9e14954..338a1cd 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
 import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
 import org.slf4j.*;
@@ -81,7 +82,7 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
 
     public static class Factory implements SpanListenerFactory {
 
-        @Override public SpanListener create(ModuleManager moduleManager) {
+        @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
             return new ServiceMappingSpanListener(moduleManager);
         }
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
index 344e049..9496815 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
@@ -113,9 +113,11 @@ class ServiceBMock {
         span.setSpanLayer(SpanLayer.Database);
         span.setParentSpanId(0);
         span.setStartTime(startTimestamp + 550);
-        span.setEndTime(startTimestamp + 1000);
+        span.setEndTime(startTimestamp + 1500);
         span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
         span.setIsError(true);
+        span.addTags(KeyWithStringValue.newBuilder().setKey("db.statement").setValue("select * from database where complex = 1;").build());
+        span.addTags(KeyWithStringValue.newBuilder().setKey("db.type").setValue("mongodb").build());
 
         if (isPrepare) {
             span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 746de62..840777d 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,6 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 #service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 5b53d73..28c7b97 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,6 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 service-mesh:
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 8f31dba..cf10758 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -80,6 +80,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
+        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
new file mode 100644
index 0000000..d5f23d5
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+/**
+ * @author wusheng
+ */
+public class TopNRecordsQueryEsDAO extends EsDAO implements ITopNRecordsQueryDAO {
+    public TopNRecordsQueryEsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    @Override
+    public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+        int topN, Order order) throws IOException {
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+        boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
+        boolQueryBuilder.must().add(QueryBuilders.termQuery(TopN.SERVICE_ID, serviceId));
+
+        sourceBuilder.query(boolQueryBuilder);
+        sourceBuilder.size(topN).sort(TopN.LATENCY, order.equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
+        SearchResponse response = getClient().search(metricName, sourceBuilder);
+
+        List<TopNRecord> results = new ArrayList<>();
+
+        for (SearchHit searchHit : response.getHits().getHits()) {
+            TopNRecord record = new TopNRecord();
+            record.setStatement((String)searchHit.getSourceAsMap().get(TopN.STATEMENT));
+            record.setTraceId((String)searchHit.getSourceAsMap().get(TopN.TRACE_ID));
+            record.setLatency(((Number)searchHit.getSourceAsMap().get(TopN.LATENCY)).longValue());
+            results.add(record);
+        }
+
+        return results;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index a76fb49..addd6a0 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -86,6 +86,7 @@ public class H2StorageProvider extends ModuleProvider {
         this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
         this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
+        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
     }
 
     @Override public void start() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
new file mode 100644
index 0000000..2ab2231
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+
+/**
+ * @author wusheng
+ */
+public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
+    private JDBCHikariCPClient h2Client;
+
+    public H2TopNRecordsQueryDAO(JDBCHikariCPClient h2Client) {
+        this.h2Client = h2Client;
+    }
+
+    @Override
+    public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+        int topN, Order order) throws IOException {
+        StringBuilder sql = new StringBuilder("select * from " + metricName + " where ");
+        List<Object> parameters = new ArrayList<>(10);
+
+        sql.append(" service_id = ? ");
+        parameters.add(serviceId);
+
+        sql.append(" and ").append(TopN.TIME_BUCKET).append(" >= ?");
+        parameters.add(startSecondTB);
+        sql.append(" and ").append(TopN.TIME_BUCKET).append(" <= ?");
+        parameters.add(endSecondTB);
+
+        sql.append(" order by ").append(TopN.LATENCY);
+        if (order.equals(Order.DES)) {
+            sql.append(" desc ");
+        } else {
+            sql.append(" asc ");
+        }
+        sql.append(" limit ").append(topN);
+
+        List<TopNRecord> results = new ArrayList<>();
+        try (Connection connection = h2Client.getConnection()) {
+            try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), parameters.toArray(new Object[0]))) {
+                while (resultSet.next()) {
+                    TopNRecord record = new TopNRecord();
+                    record.setStatement(resultSet.getString(TopN.STATEMENT));
+                    record.setTraceId(resultSet.getString(TopN.TRACE_ID));
+                    record.setLatency(resultSet.getLong(TopN.LATENCY));
+                    results.add(record);
+                }
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+
+        return results;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index fda7d31..6f2ac94 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -92,6 +92,7 @@ public class MySQLStorageProvider extends ModuleProvider {
         this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
         this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
+        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient));
     }
 
     @Override public void start() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/skywalking-ui b/skywalking-ui
index d5e46c0..ed64821 160000
--- a/skywalking-ui
+++ b/skywalking-ui
@@ -1 +1 @@
-Subproject commit d5e46c075cb30e3de42aeaaa9e839a758d3fc029
+Subproject commit ed64821de4fe1e524ec069660d18d7efb1b1061f