You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/10 12:30:27 UTC
[incubator-skywalking] branch top-sql updated: Finish topN
persistence codes. Not test yet. And query have not added.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch top-sql
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/top-sql by this push:
new e7e7bcb Finish topN persistence codes. Not test yet. And query have not added.
e7e7bcb is described below
commit e7e7bcb4313017fe413833c0a94b157ff12e713e
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 10 20:30:16 2019 +0800
Finish topN persistence codes. Not test yet. And query have not added.
---
.../server/core/analysis/DispatcherManager.java | 4 ++
.../database/DatabaseStatementDispatcher.java | 2 +-
.../manual/database/TopNDatabaseStatement.java | 4 +-
.../oap/server/core/analysis/topn/TopN.java | 6 +--
.../server/core/analysis/worker/TopNWorker.java | 11 ++++
.../server/core/source/DatabaseSlowStatement.java | 2 +-
.../trace/provider/TraceModuleProvider.java | 4 +-
.../trace/provider/TraceServiceModuleConfig.java | 6 +++
.../trace/provider/parser/SegmentParse.java | 17 +++++--
.../trace/provider/parser/SegmentParseV2.java | 15 ++++--
.../SpanListenerFactory.java => SpanTags.java} | 11 ++--
.../parser/listener/SpanListenerFactory.java | 3 +-
.../listener/endpoint/MultiScopesSpanListener.java | 58 +++++++++++++---------
.../listener/segment/SegmentSpanListener.java | 3 +-
.../service/ServiceMappingSpanListener.java | 3 +-
.../server/receiver/trace/mock/ServiceBMock.java | 3 +-
16 files changed, 98 insertions(+), 54 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 4697792..61602a6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -47,6 +47,10 @@ public class DispatcherManager {
}
public void forward(Source source) {
+ if (source == null) {
+ return;
+ }
+
for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
dispatcher.dispatch(source);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
index 8e8037e..a94d370 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -29,7 +29,7 @@ public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlo
@Override public void dispatch(DatabaseSlowStatement source) {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
statement.setDatabaseServiceId(source.getDatabaseServiceId());
- statement.setDuration(source.getDuration());
+ statement.setLatency(source.getLatency());
statement.setStatement(source.getStatement());
statement.setTimeBucket(source.getTimeBucket());
statement.setTraceId(source.getTraceId());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index 50d82bc..3639ae7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -63,7 +63,7 @@ public class TopNDatabaseStatement extends TopN {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
statement.setStatement((String)dbMap.get(STATEMENT));
statement.setTraceId((String)dbMap.get(TRACE_ID));
- statement.setDuration(((Number)dbMap.get(DURATION)).longValue());
+ statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
statement.setDatabaseServiceId(((Number)dbMap.get(DATABASE_SERVICE_ID)).intValue());
return statement;
}
@@ -72,7 +72,7 @@ public class TopNDatabaseStatement extends TopN {
Map<String, Object> map = new HashMap<>();
map.put(STATEMENT, storageData.getStatement());
map.put(TRACE_ID, storageData.getTraceId());
- map.put(DURATION, storageData.getDuration());
+ map.put(LATENCY, storageData.getLatency());
map.put(DATABASE_SERVICE_ID, storageData.getDatabaseServiceId());
return map;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
index 125ac71..1071203 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
@@ -30,15 +30,15 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
*/
public abstract class TopN extends Record implements ComparableStorageData {
public static final String STATEMENT = "statement";
- public static final String DURATION = "duration";
+ public static final String LATENCY = "latency";
public static final String TRACE_ID = "trace_id";
@Getter @Setter @Column(columnName = STATEMENT) private String statement;
- @Getter @Setter @Column(columnName = DURATION) private long duration;
+ @Getter @Setter @Column(columnName = LATENCY) private long latency;
@Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
@Override public int compareTo(Object o) {
TopN target = (TopN)o;
- return (int)(duration - target.duration);
+ return (int)(latency - target.latency);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 0dfbc0f..6ccccd8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -38,6 +38,8 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
private final IRecordDAO recordDAO;
private final String modelName;
private final DataCarrier<TopN> dataCarrier;
+ private long reportCycle;
+ private volatile long lastReportTimestamp;
public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
int topNSize,
@@ -48,6 +50,9 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
this.modelName = modelName;
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
+ this.lastReportTimestamp = System.currentTimeMillis();
+ // Top N persistent only works per 10 minutes.
+ this.reportCycle = 10 * 60 * 1000L;
}
@Override void onWork(TopN data) {
@@ -68,6 +73,12 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
}
@Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
+ long now = System.currentTimeMillis();
+ if (now - lastReportTimestamp <= reportCycle) {
+ return new ArrayList<>(0);
+ }
+ lastReportTimestamp = now;
+
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(record -> {
try {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index 4d56262..c3a7a68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
public class DatabaseSlowStatement extends Source {
@Getter @Setter private int databaseServiceId;
@Getter @Setter private String statement;
- @Getter @Setter private long duration;
+ @Getter @Setter private long latency;
@Getter @Setter private String traceId;
@Override public Scope scope() {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index baa027f..65c9179 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -73,14 +73,14 @@ public class TraceModuleProvider extends ModuleProvider {
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
- segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+ segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig);
listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
- segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager);
+ segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
index 5a36953..fec00e2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
@@ -34,4 +34,10 @@ public class TraceServiceModuleConfig extends ModuleConfig {
* 10000 means 100% sample in default.
*/
@Setter @Getter private int sampleRate = 10000;
+
+ /**
+ * The threshold used to check the slow database access.
+ * Unit, millisecond.
+ */
+ @Setter @Getter private int slowDBAccessThreshold = 200;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 0a4bd69..7abe9e5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -43,12 +44,14 @@ public class SegmentParse {
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
+ private final TraceServiceModuleConfig config;
@Setter private SegmentStandardizationWorker standardizationWorker;
private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
private volatile static CounterMetric TRACE_PARSE_ERROR;
- private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+ TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
@@ -56,6 +59,7 @@ public class SegmentParse {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(false);
+ this.config = config;
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
TRACE_BUFFER_FILE_RETRY = metricCreator.createCounter("v5_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.",
@@ -239,7 +243,7 @@ public class SegmentParse {
}
private void createSpanListeners() {
- listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+ listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
}
public enum Source {
@@ -251,20 +255,23 @@ public class SegmentParse {
@Setter private SegmentStandardizationWorker standardizationWorker;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
+ private final TraceServiceModuleConfig config;
- public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+ TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
+ this.config = config;
}
public void send(UpstreamSegment segment, Source source) {
- SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(new BufferData<>(segment), source);
}
@Override public boolean call(BufferData<UpstreamSegment> bufferData) {
- SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
boolean parseResult = segmentParse.parse(bufferData, Source.Buffer);
if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
index dd28ba4..543b0eb 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -46,12 +47,13 @@ public class SegmentParseV2 {
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
+ private final TraceServiceModuleConfig config;
@Setter private SegmentStandardizationWorker standardizationWorker;
private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
private volatile static CounterMetric TRACE_PARSE_ERROR;
- private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
@@ -59,6 +61,7 @@ public class SegmentParseV2 {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(true);
+ this.config = config;
if (TRACE_BUFFER_FILE_RETRY == null) {
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
@@ -245,7 +248,7 @@ public class SegmentParseV2 {
}
private void createSpanListeners() {
- listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+ listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
}
public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
@@ -253,20 +256,22 @@ public class SegmentParseV2 {
@Setter private SegmentStandardizationWorker standardizationWorker;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
+ private final TraceServiceModuleConfig config;
- public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
+ this.config = config;
}
public void send(UpstreamSegment segment, SegmentSource source) {
- SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+ SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(new BufferData<>(segment), source);
}
@Override public boolean call(BufferData<UpstreamSegment> bufferData) {
- SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+ SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer);
if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
similarity index 80%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
index fd8d09f..e9f0c68 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
@@ -16,13 +16,8 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+public class SpanTags {
+ public static final String DB_STATEMENT = "db.statement";
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
index fd8d09f..9b4afd6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
@@ -19,10 +19,11 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
/**
* @author peng-yongsheng
*/
public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+ SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index 590d71e..ebb3468 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -18,28 +18,18 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.EndpointRelation;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
+import org.slf4j.*;
import static java.util.Objects.nonNull;
@@ -63,16 +53,20 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
private final List<SourceBuilder> entrySourceBuilders;
private final List<SourceBuilder> exitSourceBuilders;
+ private final List<DatabaseSlowStatement> slowDatabaseAccesses;
+ private final TraceServiceModuleConfig config;
private SpanDecorator entrySpanDecorator;
private long minuteTimeBucket;
- private MultiScopesSpanListener(ModuleManager moduleManager) {
+ private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.entrySourceBuilders = new LinkedList<>();
this.exitSourceBuilders = new LinkedList<>();
+ this.slowDatabaseAccesses = new ArrayList<>(10);
this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
+ this.config = config;
}
@Override public boolean containsPoint(Point point) {
@@ -152,6 +146,22 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceBuilder.setComponentId(spanDecorator.getComponentId());
setPublicAttrs(sourceBuilder, spanDecorator);
exitSourceBuilders.add(sourceBuilder);
+
+ if (spanDecorator.getSpanLayer().equals(RequestType.DATABASE)
+ && sourceBuilder.getLatency() > config.getSlowDBAccessThreshold()) {
+ DatabaseSlowStatement statement = new DatabaseSlowStatement();
+ statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
+ statement.setLatency(sourceBuilder.getLatency());
+ statement.setTraceId(segmentCoreInfo.getSegmentId());
+ for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
+ if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
+ statement.setStatement(tag.getValue());
+ slowDatabaseAccesses.add(statement);
+ break;
+ }
+ }
+
+ }
}
private void setPublicAttrs(SourceBuilder sourceBuilder, SpanDecorator spanDecorator) {
@@ -215,13 +225,15 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());
}
});
+
+ slowDatabaseAccesses.forEach(sourceReceiver::receive);
}
public static class Factory implements SpanListenerFactory {
@Override
- public SpanListener create(ModuleManager moduleManager) {
- return new MultiScopesSpanListener(moduleManager);
+ public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
+ return new MultiScopesSpanListener(moduleManager, config);
}
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 9add272..644d2d2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
@@ -144,7 +145,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
this.sampler = new TraceSegmentSampler(segmentSamplingRate);
}
- @Override public SpanListener create(ModuleManager moduleManager) {
+ @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new SegmentSpanListener(moduleManager, sampler);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
index 9e14954..338a1cd 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
@@ -81,7 +82,7 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
public static class Factory implements SpanListenerFactory {
- @Override public SpanListener create(ModuleManager moduleManager) {
+ @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new ServiceMappingSpanListener(moduleManager);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
index 344e049..f9bcfbe 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
@@ -113,9 +113,10 @@ class ServiceBMock {
span.setSpanLayer(SpanLayer.Database);
span.setParentSpanId(0);
span.setStartTime(startTimestamp + 550);
- span.setEndTime(startTimestamp + 1000);
+ span.setEndTime(startTimestamp + 1500);
span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
span.setIsError(true);
+ span.addTags(KeyWithStringValue.newBuilder().setKey("db.statement").setValue("select * from database where complex = 1;").build());
if (isPrepare) {
span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");