You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/10 12:30:27 UTC

[incubator-skywalking] branch top-sql updated: Finish topN persistence codes. Not test yet. And query have not added.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch top-sql
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/top-sql by this push:
     new e7e7bcb  Finish topN persistence codes. Not test yet. And query have not added.
e7e7bcb is described below

commit e7e7bcb4313017fe413833c0a94b157ff12e713e
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 10 20:30:16 2019 +0800

    Finish topN persistence codes. Not test yet. And query have not added.
---
 .../server/core/analysis/DispatcherManager.java    |  4 ++
 .../database/DatabaseStatementDispatcher.java      |  2 +-
 .../manual/database/TopNDatabaseStatement.java     |  4 +-
 .../oap/server/core/analysis/topn/TopN.java        |  6 +--
 .../server/core/analysis/worker/TopNWorker.java    | 11 ++++
 .../server/core/source/DatabaseSlowStatement.java  |  2 +-
 .../trace/provider/TraceModuleProvider.java        |  4 +-
 .../trace/provider/TraceServiceModuleConfig.java   |  6 +++
 .../trace/provider/parser/SegmentParse.java        | 17 +++++--
 .../trace/provider/parser/SegmentParseV2.java      | 15 ++++--
 .../SpanListenerFactory.java => SpanTags.java}     | 11 ++--
 .../parser/listener/SpanListenerFactory.java       |  3 +-
 .../listener/endpoint/MultiScopesSpanListener.java | 58 +++++++++++++---------
 .../listener/segment/SegmentSpanListener.java      |  3 +-
 .../service/ServiceMappingSpanListener.java        |  3 +-
 .../server/receiver/trace/mock/ServiceBMock.java   |  3 +-
 16 files changed, 98 insertions(+), 54 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 4697792..61602a6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -47,6 +47,10 @@ public class DispatcherManager {
     }
 
     public void forward(Source source) {
+        if (source == null) {
+            return;
+        }
+
         for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
             dispatcher.dispatch(source);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
index 8e8037e..a94d370 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -29,7 +29,7 @@ public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlo
     @Override public void dispatch(DatabaseSlowStatement source) {
         TopNDatabaseStatement statement = new TopNDatabaseStatement();
         statement.setDatabaseServiceId(source.getDatabaseServiceId());
-        statement.setDuration(source.getDuration());
+        statement.setLatency(source.getLatency());
         statement.setStatement(source.getStatement());
         statement.setTimeBucket(source.getTimeBucket());
         statement.setTraceId(source.getTraceId());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index 50d82bc..3639ae7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -63,7 +63,7 @@ public class TopNDatabaseStatement extends TopN {
             TopNDatabaseStatement statement = new TopNDatabaseStatement();
             statement.setStatement((String)dbMap.get(STATEMENT));
             statement.setTraceId((String)dbMap.get(TRACE_ID));
-            statement.setDuration(((Number)dbMap.get(DURATION)).longValue());
+            statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
             statement.setDatabaseServiceId(((Number)dbMap.get(DATABASE_SERVICE_ID)).intValue());
             return statement;
         }
@@ -72,7 +72,7 @@ public class TopNDatabaseStatement extends TopN {
             Map<String, Object> map = new HashMap<>();
             map.put(STATEMENT, storageData.getStatement());
             map.put(TRACE_ID, storageData.getTraceId());
-            map.put(DURATION, storageData.getDuration());
+            map.put(LATENCY, storageData.getLatency());
             map.put(DATABASE_SERVICE_ID, storageData.getDatabaseServiceId());
             return map;
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
index 125ac71..1071203 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
@@ -30,15 +30,15 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
  */
 public abstract class TopN extends Record implements ComparableStorageData {
     public static final String STATEMENT = "statement";
-    public static final String DURATION = "duration";
+    public static final String LATENCY = "latency";
     public static final String TRACE_ID = "trace_id";
 
     @Getter @Setter @Column(columnName = STATEMENT) private String statement;
-    @Getter @Setter @Column(columnName = DURATION) private long duration;
+    @Getter @Setter @Column(columnName = LATENCY) private long latency;
     @Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
 
     @Override public int compareTo(Object o) {
         TopN target = (TopN)o;
-        return (int)(duration - target.duration);
+        return (int)(latency - target.latency);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 0dfbc0f..6ccccd8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -38,6 +38,8 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
     private final IRecordDAO recordDAO;
     private final String modelName;
     private final DataCarrier<TopN> dataCarrier;
+    private long reportCycle;
+    private volatile long lastReportTimestamp;
 
     public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
         int topNSize,
@@ -48,6 +50,9 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
         this.modelName = modelName;
         this.dataCarrier = new DataCarrier<>(1, 10000);
         this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
+        this.lastReportTimestamp = System.currentTimeMillis();
+        // Top N persistent only works per 10 minutes.
+        this.reportCycle = 10 * 60 * 1000L;
     }
 
     @Override void onWork(TopN data) {
@@ -68,6 +73,12 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
     }
 
     @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
+        long now = System.currentTimeMillis();
+        if (now - lastReportTimestamp <= reportCycle) {
+            return new ArrayList<>(0);
+        }
+        lastReportTimestamp = now;
+
         List<Object> batchCollection = new LinkedList<>();
         cache.getLast().collection().forEach(record -> {
             try {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index 4d56262..c3a7a68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
 public class DatabaseSlowStatement extends Source {
     @Getter @Setter private int databaseServiceId;
     @Getter @Setter private String statement;
-    @Getter @Setter private long duration;
+    @Getter @Setter private long latency;
     @Getter @Setter private String traceId;
 
     @Override public Scope scope() {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index baa027f..65c9179 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -73,14 +73,14 @@ public class TraceModuleProvider extends ModuleProvider {
         listenerManager.add(new ServiceMappingSpanListener.Factory());
         listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
 
-        segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+        segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig);
 
         listenerManager = new SegmentParserListenerManager();
         listenerManager.add(new MultiScopesSpanListener.Factory());
         listenerManager.add(new ServiceMappingSpanListener.Factory());
         listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
 
-        segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager);
+        segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig);
 
         this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
index 5a36953..fec00e2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
@@ -34,4 +34,10 @@ public class TraceServiceModuleConfig extends ModuleConfig {
      * 10000 means 100% sample in default.
      */
     @Setter @Getter private int sampleRate = 10000;
+
+    /**
+     * The threshold used to check the slow database access.
+     * Unit, millisecond.
+     */
+    @Setter @Getter private int slowDBAccessThreshold = 200;
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 0a4bd69..7abe9e5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.network.language.agent.*;
 import org.apache.skywalking.oap.server.library.buffer.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -43,12 +44,14 @@ public class SegmentParse {
     private final List<SpanListener> spanListeners;
     private final SegmentParserListenerManager listenerManager;
     private final SegmentCoreInfo segmentCoreInfo;
+    private final TraceServiceModuleConfig config;
     @Setter private SegmentStandardizationWorker standardizationWorker;
     private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
     private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
     private volatile static CounterMetric TRACE_PARSE_ERROR;
 
-    private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+    private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+        TraceServiceModuleConfig config) {
         this.moduleManager = moduleManager;
         this.listenerManager = listenerManager;
         this.spanListeners = new LinkedList<>();
@@ -56,6 +59,7 @@ public class SegmentParse {
         this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
         this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
         this.segmentCoreInfo.setV2(false);
+        this.config = config;
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         TRACE_BUFFER_FILE_RETRY = metricCreator.createCounter("v5_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.",
@@ -239,7 +243,7 @@ public class SegmentParse {
     }
 
     private void createSpanListeners() {
-        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
     }
 
     public enum Source {
@@ -251,20 +255,23 @@ public class SegmentParse {
         @Setter private SegmentStandardizationWorker standardizationWorker;
         private final ModuleManager moduleManager;
         private final SegmentParserListenerManager listenerManager;
+        private final TraceServiceModuleConfig config;
 
-        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+            TraceServiceModuleConfig config) {
             this.moduleManager = moduleManager;
             this.listenerManager = listenerManager;
+            this.config = config;
         }
 
         public void send(UpstreamSegment segment, Source source) {
-            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             segmentParse.parse(new BufferData<>(segment), source);
         }
 
         @Override public boolean call(BufferData<UpstreamSegment> bufferData) {
-            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             boolean parseResult = segmentParse.parse(bufferData, Source.Buffer);
             if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
index dd28ba4..543b0eb 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
 import org.apache.skywalking.oap.server.library.buffer.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -46,12 +47,13 @@ public class SegmentParseV2 {
     private final List<SpanListener> spanListeners;
     private final SegmentParserListenerManager listenerManager;
     private final SegmentCoreInfo segmentCoreInfo;
+    private final TraceServiceModuleConfig config;
     @Setter private SegmentStandardizationWorker standardizationWorker;
     private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
     private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
     private volatile static CounterMetric TRACE_PARSE_ERROR;
 
-    private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+    private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
         this.moduleManager = moduleManager;
         this.listenerManager = listenerManager;
         this.spanListeners = new LinkedList<>();
@@ -59,6 +61,7 @@ public class SegmentParseV2 {
         this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
         this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
         this.segmentCoreInfo.setV2(true);
+        this.config = config;
 
         if (TRACE_BUFFER_FILE_RETRY == null) {
             MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
@@ -245,7 +248,7 @@ public class SegmentParseV2 {
     }
 
     private void createSpanListeners() {
-        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+        listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
     }
 
     public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
@@ -253,20 +256,22 @@ public class SegmentParseV2 {
         @Setter private SegmentStandardizationWorker standardizationWorker;
         private final ModuleManager moduleManager;
         private final SegmentParserListenerManager listenerManager;
+        private final TraceServiceModuleConfig config;
 
-        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
             this.moduleManager = moduleManager;
             this.listenerManager = listenerManager;
+            this.config = config;
         }
 
         public void send(UpstreamSegment segment, SegmentSource source) {
-            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             segmentParse.parse(new BufferData<>(segment), source);
         }
 
         @Override public boolean call(BufferData<UpstreamSegment> bufferData) {
-            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+            SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
             segmentParse.setStandardizationWorker(standardizationWorker);
             boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer);
             if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
similarity index 80%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
index fd8d09f..e9f0c68 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
@@ -16,13 +16,8 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
 
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+public class SpanTags {
+    public static final String DB_STATEMENT = "db.statement";
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
index fd8d09f..9b4afd6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
@@ -19,10 +19,11 @@
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
 
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 
 /**
  * @author peng-yongsheng
  */
 public interface SpanListenerFactory {
-    SpanListener create(ModuleManager moduleManager);
+    SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config);
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index 590d71e..ebb3468 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -18,28 +18,18 @@
 
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
 import org.apache.skywalking.apm.network.language.agent.SpanLayer;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.EndpointRelation;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
+import org.slf4j.*;
 
 import static java.util.Objects.nonNull;
 
@@ -63,16 +53,20 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
 
     private final List<SourceBuilder> entrySourceBuilders;
     private final List<SourceBuilder> exitSourceBuilders;
+    private final List<DatabaseSlowStatement> slowDatabaseAccesses;
+    private final TraceServiceModuleConfig config;
     private SpanDecorator entrySpanDecorator;
     private long minuteTimeBucket;
 
-    private MultiScopesSpanListener(ModuleManager moduleManager) {
+    private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {
         this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
         this.entrySourceBuilders = new LinkedList<>();
         this.exitSourceBuilders = new LinkedList<>();
+        this.slowDatabaseAccesses = new ArrayList<>(10);
         this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
         this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
         this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
+        this.config = config;
     }
 
     @Override public boolean containsPoint(Point point) {
@@ -152,6 +146,22 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
         sourceBuilder.setComponentId(spanDecorator.getComponentId());
         setPublicAttrs(sourceBuilder, spanDecorator);
         exitSourceBuilders.add(sourceBuilder);
+
+        if (spanDecorator.getSpanLayer().equals(RequestType.DATABASE)
+            && sourceBuilder.getLatency() > config.getSlowDBAccessThreshold()) {
+            DatabaseSlowStatement statement = new DatabaseSlowStatement();
+            statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
+            statement.setLatency(sourceBuilder.getLatency());
+            statement.setTraceId(segmentCoreInfo.getSegmentId());
+            for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
+                if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
+                    statement.setStatement(tag.getValue());
+                    slowDatabaseAccesses.add(statement);
+                    break;
+                }
+            }
+
+        }
     }
 
     private void setPublicAttrs(SourceBuilder sourceBuilder, SpanDecorator spanDecorator) {
@@ -215,13 +225,15 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
                 sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());
             }
         });
+
+        slowDatabaseAccesses.forEach(sourceReceiver::receive);
     }
 
     public static class Factory implements SpanListenerFactory {
 
         @Override
-        public SpanListener create(ModuleManager moduleManager) {
-            return new MultiScopesSpanListener(moduleManager);
+        public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
+            return new MultiScopesSpanListener(moduleManager, config);
         }
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 9add272..644d2d2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
@@ -144,7 +145,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
             this.sampler = new TraceSegmentSampler(segmentSamplingRate);
         }
 
-        @Override public SpanListener create(ModuleManager moduleManager) {
+        @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
             return new SegmentSpanListener(moduleManager, sampler);
         }
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
index 9e14954..338a1cd 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
 import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
 import org.slf4j.*;
@@ -81,7 +82,7 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
 
     public static class Factory implements SpanListenerFactory {
 
-        @Override public SpanListener create(ModuleManager moduleManager) {
+        @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
             return new ServiceMappingSpanListener(moduleManager);
         }
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
index 344e049..f9bcfbe 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
@@ -113,9 +113,10 @@ class ServiceBMock {
         span.setSpanLayer(SpanLayer.Database);
         span.setParentSpanId(0);
         span.setStartTime(startTimestamp + 550);
-        span.setEndTime(startTimestamp + 1000);
+        span.setEndTime(startTimestamp + 1500);
         span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
         span.setIsError(true);
+        span.addTags(KeyWithStringValue.newBuilder().setKey("db.statement").setValue("select * from database where complex = 1;").build());
 
         if (isPrepare) {
             span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");