You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2019/02/12 15:13:11 UTC
[incubator-skywalking] branch master updated: Support Top sql
(#2239)
This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 4babd6f Support Top sql (#2239)
4babd6f is described below
commit 4babd6ff73ee32535f09eb31ecbda736e3052a32
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Feb 12 23:13:02 2019 +0800
Support Top sql (#2239)
* The prototype of topN sql worker.
* Add scope and new manual dispatcher.
* no message
* Finish topN persistence codes. Not test yet. And query have not added.
* Finish the top n database statement persistent.
* Support different slow thresholds for different db types.
* Fix wrong db default threshold
* Finish new query protocol binding.
* Provide query empty implementation and sync ui.
* Finish all codes, hope it works :P
I will run the tests tonight.
* Fix
* Fix
* Fix a startup issue.
* Add time bucket to persistent.
* Fix wrong query result column name.
* Fix Database query.
* Fix checkstyle.
* Fix wrong order logic, and add a test case to verify, to fix https://github.com/apache/incubator-skywalking/pull/2239#discussion_r255948249
---
docker/config/application.yml | 1 +
.../skywalking/oap/server/core/CoreModule.java | 1 +
.../oap/server/core/CoreModuleProvider.java | 3 +
.../server/core/analysis/DispatcherManager.java | 4 +
.../core/analysis/data/LimitedSizeDataCache.java | 51 ++++++++
.../analysis/data/LimitedSizeDataCollection.java | 115 ++++++++++++++++++
.../core/analysis/data/NonMergeDataCollection.java | 4 +-
.../oap/server/core/analysis/data/Window.java | 10 ++
.../database/DatabaseStatementDispatcher.java | 40 ++++++
.../manual/database/TopNDatabaseStatement.java | 82 +++++++++++++
.../oap/server/core/analysis/topn/TopN.java | 46 +++++++
.../core/analysis/topn/annotation/TopNType.java} | 12 +-
.../topn/annotation/TopNTypeListener.java} | 24 +++-
.../server/core/analysis/worker/TopNProcess.java | 66 ++++++++++
.../server/core/analysis/worker/TopNWorker.java | 134 +++++++++++++++++++++
.../server/core/query/TopNRecordsQueryService.java | 51 ++++++++
.../oap/server/core/query/entity/TopNRecord.java} | 14 ++-
.../{Scope.java => DatabaseSlowStatement.java} | 29 +++--
.../skywalking/oap/server/core/source/Scope.java | 2 +-
.../core/storage/ComparableStorageData.java} | 11 +-
.../oap/server/core/storage/PersistenceTimer.java | 1 +
.../oap/server/core/storage/StorageModule.java | 3 +-
.../core/storage/query/ITopNRecordsQueryDAO.java} | 15 +--
.../data/LimitedSizeDataCollectionTest.java | 72 +++++++++++
.../oap/query/graphql/GraphQLQueryProvider.java | 2 +
.../query/graphql/resolver/TopNRecordsQuery.java | 59 +++++++++
.../query/graphql/type/TopNRecordsCondition.java} | 18 +--
.../src/main/resources/query-protocol | 2 +-
...eModuleConfig.java => DBLatencyThresholds.java} | 40 ++++--
.../trace/provider/TraceModuleProvider.java | 21 ++--
.../trace/provider/TraceServiceModuleConfig.java | 9 +-
.../trace/provider/parser/SegmentParse.java | 17 ++-
.../trace/provider/parser/SegmentParseV2.java | 15 ++-
.../SpanListenerFactory.java => SpanTags.java} | 11 +-
.../parser/listener/SpanListenerFactory.java | 3 +-
.../listener/endpoint/MultiScopesSpanListener.java | 71 +++++++----
.../listener/segment/SegmentSpanListener.java | 3 +-
.../service/ServiceMappingSpanListener.java | 3 +-
.../server/receiver/trace/mock/ServiceBMock.java | 4 +-
.../src/main/assembly/application.yml | 1 +
.../src/main/resources/application.yml | 1 +
.../StorageModuleElasticsearchProvider.java | 1 +
.../elasticsearch/query/TopNRecordsQueryEsDAO.java | 66 ++++++++++
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 1 +
.../plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java | 78 ++++++++++++
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 1 +
skywalking-ui | 2 +-
47 files changed, 1095 insertions(+), 125 deletions(-)
diff --git a/docker/config/application.yml b/docker/config/application.yml
index 1a13b4c..f9b645f 100644
--- a/docker/config/application.yml
+++ b/docker/config/application.yml
@@ -75,6 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh:
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 1baa250..8eb7302 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -65,6 +65,7 @@ public class CoreModule extends ModuleDefine {
classes.add(MetadataQueryService.class);
classes.add(AggregationQueryService.class);
classes.add(AlarmQueryService.class);
+ classes.add(TopNRecordsQueryService.class);
}
private void addServerInterface(List<Class> classes) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 3eb387e..0106fe7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
+import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
@@ -130,12 +131,14 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
+ this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
annotationScan.registerListener(new RecordTypeListener(getManager()));
+ annotationScan.registerListener(new TopNTypeListener(getManager()));
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 4697792..61602a6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -47,6 +47,10 @@ public class DispatcherManager {
}
public void forward(Source source) {
+ if (source == null) {
+ return;
+ }
+
for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
dispatcher.dispatch(source);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
new file mode 100644
index 0000000..33cfe1d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import org.apache.skywalking.oap.server.core.storage.*;
+
+public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
+
+ private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
+ private final int limitSize;
+
+ public LimitedSizeDataCache(int limitSize) {
+ super(false);
+ this.limitSize = limitSize;
+ init();
+ }
+
+ @Override public SWCollection<STORAGE_DATA> collectionInstance() {
+ return new LimitedSizeDataCollection<>(limitSize);
+ }
+
+ public void add(STORAGE_DATA data) {
+ limitedSizeDataCollection.put(data);
+ }
+
+ @Override public void writing() {
+ limitedSizeDataCollection = getCurrentAndWriting();
+ }
+
+ @Override public void finishWriting() {
+ limitedSizeDataCollection.finishWriting();
+ limitedSizeDataCollection = null;
+ }
+}
+
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
new file mode 100644
index 0000000..12cd96c
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+
+public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
+
+ private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
+ private final int limitedSize;
+ private volatile boolean writing;
+ private volatile boolean reading;
+
+ LimitedSizeDataCollection(int limitedSize) {
+ this.data = new HashMap<>();
+ this.writing = false;
+ this.reading = false;
+ this.limitedSize = limitedSize;
+ }
+
+ public void finishWriting() {
+ writing = false;
+ }
+
+ @Override public void writing() {
+ writing = true;
+ }
+
+ @Override public boolean isWriting() {
+ return writing;
+ }
+
+ @Override public void finishReading() {
+ reading = false;
+ }
+
+ @Override public void reading() {
+ reading = true;
+ }
+
+ @Override public boolean isReading() {
+ return reading;
+ }
+
+ @Override public int size() {
+ return data.size();
+ }
+
+ @Override public void clear() {
+ data.clear();
+ }
+
+ @Override public boolean containsKey(STORAGE_DATA key) {
+ throw new UnsupportedOperationException("Limited size data collection doesn't support containsKey operation.");
+ }
+
+ @Override public STORAGE_DATA get(STORAGE_DATA key) {
+ throw new UnsupportedOperationException("Limited size data collection doesn't support get operation.");
+ }
+
+ @Override public void put(STORAGE_DATA value) {
+ LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
+ if (storageDataList == null) {
+ storageDataList = new LinkedList<>();
+ data.put(value, storageDataList);
+ }
+
+ if (storageDataList.size() < limitedSize) {
+ storageDataList.add(value);
+ return;
+ }
+
+ for (int i = 0; i < storageDataList.size(); i++) {
+ STORAGE_DATA storageData = storageDataList.get(i);
+ if (value.compareTo(storageData) <= 0) {
+ if (i == 0) {
+ // input value is less than the smallest in top N list, ignore
+ } else {
+ // Remove the smallest in top N list
+ // add the current value into the right position
+ storageDataList.add(i, value);
+ storageDataList.removeFirst();
+ }
+ return;
+ }
+ }
+
+ // Add the value as biggest in top N list
+ storageDataList.addLast(value);
+ storageDataList.removeFirst();
+ }
+
+ @Override public Collection<STORAGE_DATA> collection() {
+ List<STORAGE_DATA> collection = new ArrayList<>();
+ data.values().forEach(e -> e.forEach(collection::add));
+ return collection;
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
index 1d7a53c..bba2fc0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
@@ -69,11 +69,11 @@ public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements
}
@Override public boolean containsKey(STORAGE_DATA key) {
- throw new UnsupportedOperationException("Close merge data collection not support containsKey operation.");
+ throw new UnsupportedOperationException("Non merge data collection doesn't support containsKey operation.");
}
@Override public STORAGE_DATA get(STORAGE_DATA key) {
- throw new UnsupportedOperationException("Close merge data collection not support get operation.");
+ throw new UnsupportedOperationException("Non merge data collection doesn't support get operation.");
}
@Override public void put(STORAGE_DATA value) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
index 6f9486d..ff2ca6d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
@@ -33,6 +33,16 @@ public abstract class Window<DATA> {
private SWCollection<DATA> windowDataB;
Window() {
+ this(true);
+ }
+
+ Window(boolean autoInit) {
+ if (autoInit) {
+ init();
+ }
+ }
+
+ protected void init() {
this.windowDataA = collectionInstance();
this.windowDataB = collectionInstance();
this.pointer = windowDataA;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
new file mode 100644
index 0000000..f36cca5
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.database;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
+import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
+
+/**
+ * @author wusheng
+ */
+public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
+ @Override public void dispatch(DatabaseSlowStatement source) {
+ TopNDatabaseStatement statement = new TopNDatabaseStatement();
+ statement.setId(source.getId());
+ statement.setServiceId(source.getDatabaseServiceId());
+ statement.setLatency(source.getLatency());
+ statement.setStatement(source.getStatement());
+ statement.setTimeBucket(source.getTimeBucket());
+ statement.setTraceId(source.getTraceId());
+
+ TopNProcess.INSTANCE.in(statement);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
new file mode 100644
index 0000000..2456486
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.database;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNType;
+import org.apache.skywalking.oap.server.core.source.Scope;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+/**
+ * Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
+ *
+ * @author wusheng
+ */
+@TopNType
+@StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, source = Scope.DatabaseSlowStatement)
+public class TopNDatabaseStatement extends TopN {
+ public static final String INDEX_NAME = "top_n_database_statement";
+
+
+ @Setter private String id;
+
+ @Override public String id() {
+ return id;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
+ return getServiceId() == statement.getServiceId();
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(getServiceId());
+ }
+
+ public static class Builder implements StorageBuilder<TopNDatabaseStatement> {
+
+ @Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) {
+ TopNDatabaseStatement statement = new TopNDatabaseStatement();
+ statement.setStatement((String)dbMap.get(STATEMENT));
+ statement.setTraceId((String)dbMap.get(TRACE_ID));
+ statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
+ statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
+ statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ return statement;
+ }
+
+ @Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(STATEMENT, storageData.getStatement());
+ map.put(TRACE_ID, storageData.getTraceId());
+ map.put(LATENCY, storageData.getLatency());
+ map.put(SERVICE_ID, storageData.getServiceId());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ return map;
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
new file mode 100644
index 0000000..fae8298
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.topn;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+/**
+ * TopN data.
+ *
+ * @author wusheng
+ */
+public abstract class TopN extends Record implements ComparableStorageData {
+ public static final String STATEMENT = "statement";
+ public static final String LATENCY = "latency";
+ public static final String TRACE_ID = "trace_id";
+ public static final String SERVICE_ID = "service_id";
+
+ @Getter @Setter @Column(columnName = STATEMENT) private String statement;
+ @Getter @Setter @Column(columnName = LATENCY) private long latency;
+ @Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
+ @Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId;
+
+ @Override public int compareTo(Object o) {
+ TopN target = (TopN)o;
+ return (int)(latency - target.latency);
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java
similarity index 74%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java
index fd8d09f..82731b6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java
@@ -16,13 +16,11 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import java.lang.annotation.*;
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface TopNType {
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java
similarity index 55%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java
index fd8d09f..d668cdd 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java
@@ -16,13 +16,29 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
+import java.lang.annotation.Annotation;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
+import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+public class TopNTypeListener implements AnnotationListener {
+
+ private final ModuleManager moduleManager;
+
+ public TopNTypeListener(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ }
+
+ @Override public Class<? extends Annotation> annotation() {
+ return TopNType.class;
+ }
+
+ @Override public void notify(Class aClass) {
+ TopNProcess.INSTANCE.create(moduleManager, aClass);
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
new file mode 100644
index 0000000..059c46e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.database.TopNDatabaseStatement;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * TopN is a special process, which hold a certain size of windows,
+ * and cache all top N records, save to the persistence in low frequence.
+ *
+ * @author wusheng
+ */
+public enum TopNProcess {
+ INSTANCE;
+
+ @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
+ private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
+
+ public void create(ModuleManager moduleManager, Class<? extends TopN> topNClass) {
+ String modelName = StorageEntityAnnotationUtils.getModelName(topNClass);
+ Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topNClass);
+
+ StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
+ IRecordDAO recordDAO;
+ try {
+ recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new UnexpectedException("");
+ }
+
+ TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager,
+ 50, recordDAO);
+ WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ persistentWorkers.add(persistentWorker);
+ workers.put(topNClass, persistentWorker);
+ }
+
+ public void in(TopNDatabaseStatement statement) {
+ workers.get(statement.getClass()).in(statement);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
new file mode 100644
index 0000000..5d6304e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * Top N worker is a persistence worker, but no
+ *
+ * @author wusheng
+ */
+public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
+ private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
+ private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
+ private final IRecordDAO recordDAO;
+ private final String modelName;
+ private final DataCarrier<TopN> dataCarrier;
+ private long reportCycle;
+ private volatile long lastReportTimestamp;
+
+ public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
+ int topNSize,
+ IRecordDAO recordDAO) {
+ super(moduleManager, workerId, -1);
+ this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
+ this.recordDAO = recordDAO;
+ this.modelName = modelName;
+ this.dataCarrier = new DataCarrier<>(1, 10000);
+ this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
+ this.lastReportTimestamp = System.currentTimeMillis();
+ // Top N persistent only works per 10 minutes.
+ this.reportCycle = 10 * 60 * 1000L;
+ }
+
+ @Override void onWork(TopN data) {
+ limitedSizeDataCache.writing();
+ try {
+ limitedSizeDataCache.add(data);
+ } finally {
+ limitedSizeDataCache.finishWriting();
+ }
+ }
+
+ /**
+ * TopN is not following the batch size trigger mode. The memory cost of this worker is limited always.
+ *
+ * `onWork` method has been override, so this method would never be executed. No need to implement this method,
+ */
+ @Override public void cacheData(TopN data) {
+
+ }
+
+ @Override public LimitedSizeDataCache<TopN> getCache() {
+ return limitedSizeDataCache;
+ }
+
+ /**
+ * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
+ * time windows.
+ *
+ * Switch and persistent attempt happens based on reportCycle.
+ *
+ * @return
+ */
+ @Override public boolean flushAndSwitch() {
+ long now = System.currentTimeMillis();
+ if (now - lastReportTimestamp <= reportCycle) {
+ return false;
+ }
+ lastReportTimestamp = now;
+ return super.flushAndSwitch();
+ }
+
+ @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
+ List<Object> batchCollection = new LinkedList<>();
+ cache.getLast().collection().forEach(record -> {
+ try {
+ batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ });
+ return batchCollection;
+ }
+
+ @Override public void in(TopN n) {
+ dataCarrier.produce(n);
+ }
+
+ private class TopNConsumer implements IConsumer<TopN> {
+ @Override public void init() {
+
+ }
+
+ @Override public void consume(List<TopN> data) {
+ /**
+ * TopN is not following the batch size trigger mode.
+ * No need to implement this method, the memory size is limited always.
+ */
+ data.forEach(row -> onWork(row));
+ }
+
+ @Override public void onError(List<TopN> data, Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+
+ @Override public void onExit() {
+
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
new file mode 100644
index 0000000..3d07aa5
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author wusheng
+ */
+public class TopNRecordsQueryService implements Service {
+ private final ModuleManager moduleManager;
+ private ITopNRecordsQueryDAO topNRecordsQueryDAO;
+
+ public TopNRecordsQueryService(ModuleManager manager) {
+ this.moduleManager = manager;
+ }
+
+ private ITopNRecordsQueryDAO getTopNRecordsQueryDAO() {
+ if (topNRecordsQueryDAO == null) {
+ this.topNRecordsQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ITopNRecordsQueryDAO.class);
+ }
+ return topNRecordsQueryDAO;
+ }
+
+ public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+ int topN, Order order) throws IOException {
+ return getTopNRecordsQueryDAO().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java
similarity index 75%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java
index fd8d09f..e8baa5d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java
@@ -16,13 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.query.entity;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import lombok.*;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+@Setter
+@Getter
+public class TopNRecord {
+ private String statement;
+ private long latency;
+ private String traceId;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
similarity index 58%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index d162f3e..254a5e7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -18,18 +18,27 @@
package org.apache.skywalking.oap.server.core.source;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
+
/**
- * @author peng-yongsheng, wusheng
+ * @author wusheng
*/
-public enum Scope {
- All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
- ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
- Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess;
+@SourceType
+public class DatabaseSlowStatement extends Source {
+ @Getter @Setter private String id;
+ @Getter @Setter private int databaseServiceId;
+ @Getter @Setter private String statement;
+ @Getter @Setter private long latency;
+ @Getter @Setter private String traceId;
+
+ @Override public Scope scope() {
+ return Scope.DatabaseSlowStatement;
+ }
- public static Scope valueOf(int ordinal) {
- if (ordinal < 0 || ordinal >= values().length) {
- throw new IndexOutOfBoundsException("Invalid ordinal");
- }
- return values()[ordinal];
+ @Override public String getEntityId() {
+ return Const.EMPTY_STRING;
}
+
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
index d162f3e..fd66299 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
@@ -24,7 +24,7 @@ package org.apache.skywalking.oap.server.core.source;
public enum Scope {
All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
- Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess;
+ Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess, DatabaseSlowStatement;
public static Scope valueOf(int ordinal) {
if (ordinal < 0 || ordinal >= values().length) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java
similarity index 75%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java
index fd8d09f..8e1a639 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java
@@ -16,13 +16,12 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
-
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+package org.apache.skywalking.oap.server.core.storage;
/**
- * @author peng-yongsheng
+ * Storage data with comparable capability.
+ *
+ * @author wusheng
*/
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+public interface ComparableStorageData extends StorageData, Comparable {
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 367bd9e..9bdbfd1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -84,6 +84,7 @@ public enum PersistenceTimer {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
+ persistenceWorkers.addAll(TopNProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 02a896e..6072912 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -39,6 +39,7 @@ public class StorageModule extends ModuleDefine {
IHistoryDeleteDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
- ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class};
+ ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class,
+ ITopNRecordsQueryDAO.class};
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
similarity index 64%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
index fd8d09f..4b20e53 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
@@ -16,13 +16,14 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.storage.query;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.library.module.Service;
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+public interface ITopNRecordsQueryDAO extends Service {
+ List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+ int topN, Order order) throws IOException;
}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
new file mode 100644
index 0000000..852c884
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.Objects;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+import org.junit.*;
+
+/**
+ * @author wusheng
+ */
+public class LimitedSizeDataCollectionTest {
+ @Test
+ public void testPut() {
+ LimitedSizeDataCollection<MockStorageData> collection = new LimitedSizeDataCollection<>(5);
+ collection.put(new MockStorageData(1));
+ collection.put(new MockStorageData(3));
+ collection.put(new MockStorageData(5));
+ collection.put(new MockStorageData(7));
+ collection.put(new MockStorageData(9));
+
+ MockStorageData income = new MockStorageData(4);
+ collection.put(income);
+
+ int[] expected = new int[] {3, 4, 5, 7, 9};
+ int i = 0;
+ for (MockStorageData data : collection.collection()) {
+ Assert.assertEquals(expected[i++], data.latency);
+ }
+ }
+
+ private class MockStorageData implements ComparableStorageData {
+ private long latency;
+
+ public MockStorageData(long latency) {
+ this.latency = latency;
+ }
+
+ @Override public int compareTo(Object o) {
+ MockStorageData target = (MockStorageData)o;
+ return (int)(latency - target.latency);
+ }
+
+ @Override public String id() {
+ return null;
+ }
+
+ @Override public boolean equals(Object o) {
+ return true;
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(1);
+ }
+ }
+}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
index c834daf..d940141 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
@@ -66,6 +66,8 @@ public class GraphQLQueryProvider extends ModuleProvider {
.resolvers(new AggregationQuery(getManager()))
.file("query-protocol/alarm.graphqls")
.resolvers(new AlarmQuery(getManager()))
+ .file("query-protocol/top-n-records.graphqls")
+ .resolvers(new TopNRecordsQuery(getManager()))
.build()
.makeExecutableSchema();
this.graphQL = GraphQL.newGraphQL(schema).build();
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
new file mode 100644
index 0000000..6354521
--- /dev/null
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.query.graphql.resolver;
+
+import com.coxautodev.graphql.tools.GraphQLQueryResolver;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.query.graphql.type.TopNRecordsCondition;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.query.*;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author wusheng
+ */
+public class TopNRecordsQuery implements GraphQLQueryResolver {
+ private final ModuleManager moduleManager;
+ private TopNRecordsQueryService topNRecordsQueryService;
+
+ public TopNRecordsQuery(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ }
+
+ private TopNRecordsQueryService getTopNRecordsQueryService() {
+ if (topNRecordsQueryService == null) {
+ this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME).provider().getService(TopNRecordsQueryService.class);
+ }
+ return topNRecordsQueryService;
+ }
+
+ public List<TopNRecord> getTopNRecords(TopNRecordsCondition condition) throws IOException {
+ long startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getStart());
+ long endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getEnd());
+
+ String metricName = condition.getMetricName();
+ Order order = condition.getOrder();
+ int topN = condition.getTopN();
+ int serviceId = condition.getServiceId();
+
+ return getTopNRecordsQueryService().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
similarity index 71%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
index fd8d09f..d698f1a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
@@ -16,13 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.query.graphql.type;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.query.entity.Order;
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+@Getter
+@Setter
+public class TopNRecordsCondition {
+ private int serviceId;
+ private String metricName;
+ private int topN;
+ private Order order;
+ private Duration duration;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 85b81e2..6f11e3b 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 85b81e2e34efb0b670d039154feca336c9203700
+Subproject commit 6f11e3b829bba4d3532477e968291cf657f0ac0b
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java
similarity index 50%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java
index 5a36953..b66c890 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java
@@ -18,20 +18,34 @@
package org.apache.skywalking.oap.server.receiver.trace.provider;
-import lombok.*;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import java.util.*;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-public class TraceServiceModuleConfig extends ModuleConfig {
- @Setter @Getter private String bufferPath;
- @Setter @Getter private int bufferOffsetMaxFileSize;
- @Setter @Getter private int bufferDataMaxFileSize;
- @Setter @Getter private boolean bufferFileCleanWhenRestart;
- /**
- * The sample rate precision is 1/10000.
- * 10000 means 100% sample in default.
- */
- @Setter @Getter private int sampleRate = 10000;
+public class DBLatencyThresholds {
+ private Map<String, Integer> thresholds;
+
+ DBLatencyThresholds(String config) {
+ thresholds = new HashMap<>();
+ String[] settings = config.split(",");
+ for (String setting : settings) {
+ String[] typeValue = setting.split(":");
+ if (typeValue.length == 2) {
+ thresholds.put(typeValue[0].toLowerCase(), Integer.parseInt(typeValue[1]));
+ }
+ }
+ if (!thresholds.containsKey("default")) {
+ thresholds.put("default", 10000);
+ }
+ }
+
+ public int getThreshold(String type) {
+ type = type.toLowerCase();
+ if (thresholds.containsKey(type)) {
+ return thresholds.get(type);
+ } else {
+ return thresholds.get("default");
+ }
+ }
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index baa027f..367702d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -20,22 +20,13 @@ package org.apache.skywalking.oap.server.receiver.trace.provider;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserServiceImpl;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener;
@@ -68,19 +59,21 @@ public class TraceModuleProvider extends ModuleProvider {
}
@Override public void prepare() throws ServiceNotProvidedException {
+ moduleConfig.setDbLatencyThresholds(new DBLatencyThresholds(moduleConfig.getSlowDBAccessThreshold()));
+
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
- segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+ segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig);
listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
- segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager);
+ segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
index 5a36953..8ebeec6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
@@ -30,8 +30,13 @@ public class TraceServiceModuleConfig extends ModuleConfig {
@Setter @Getter private int bufferDataMaxFileSize;
@Setter @Getter private boolean bufferFileCleanWhenRestart;
/**
- * The sample rate precision is 1/10000.
- * 10000 means 100% sample in default.
+ * The sample rate precision is 1/10000. 10000 means 100% sample in default.
*/
@Setter @Getter private int sampleRate = 10000;
+
+ /**
+ * The threshold used to check the slow database access. Unit, millisecond.
+ */
+ @Setter @Getter private String slowDBAccessThreshold = "default:200";
+ @Setter @Getter private DBLatencyThresholds dbLatencyThresholds;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 0a4bd69..7abe9e5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -43,12 +44,14 @@ public class SegmentParse {
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
+ private final TraceServiceModuleConfig config;
@Setter private SegmentStandardizationWorker standardizationWorker;
private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
private volatile static CounterMetric TRACE_PARSE_ERROR;
- private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+ TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
@@ -56,6 +59,7 @@ public class SegmentParse {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(false);
+ this.config = config;
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
TRACE_BUFFER_FILE_RETRY = metricCreator.createCounter("v5_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.",
@@ -239,7 +243,7 @@ public class SegmentParse {
}
private void createSpanListeners() {
- listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+ listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
}
public enum Source {
@@ -251,20 +255,23 @@ public class SegmentParse {
@Setter private SegmentStandardizationWorker standardizationWorker;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
+ private final TraceServiceModuleConfig config;
- public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
+ TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
+ this.config = config;
}
public void send(UpstreamSegment segment, Source source) {
- SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(new BufferData<>(segment), source);
}
@Override public boolean call(BufferData<UpstreamSegment> bufferData) {
- SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
boolean parseResult = segmentParse.parse(bufferData, Source.Buffer);
if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
index dd28ba4..543b0eb 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
@@ -46,12 +47,13 @@ public class SegmentParseV2 {
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
+ private final TraceServiceModuleConfig config;
@Setter private SegmentStandardizationWorker standardizationWorker;
private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
private volatile static CounterMetric TRACE_PARSE_ERROR;
- private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
@@ -59,6 +61,7 @@ public class SegmentParseV2 {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(true);
+ this.config = config;
if (TRACE_BUFFER_FILE_RETRY == null) {
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
@@ -245,7 +248,7 @@ public class SegmentParseV2 {
}
private void createSpanListeners() {
- listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
+ listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
}
public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
@@ -253,20 +256,22 @@ public class SegmentParseV2 {
@Setter private SegmentStandardizationWorker standardizationWorker;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
+ private final TraceServiceModuleConfig config;
- public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
+ this.config = config;
}
public void send(UpstreamSegment segment, SegmentSource source) {
- SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+ SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(new BufferData<>(segment), source);
}
@Override public boolean call(BufferData<UpstreamSegment> bufferData) {
- SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
+ SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer);
if (parseResult) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
similarity index 80%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
index fd8d09f..0f9fef0 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java
@@ -16,13 +16,10 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+public class SpanTags {
+ public static final String DB_STATEMENT = "db.statement";
-/**
- * @author peng-yongsheng
- */
-public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+ public static final String DB_TYPE = "db.type";
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
index fd8d09f..9b4afd6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java
@@ -19,10 +19,11 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
/**
* @author peng-yongsheng
*/
public interface SpanListenerFactory {
- SpanListener create(ModuleManager moduleManager);
+ SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index 590d71e..0330e77 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -18,28 +18,19 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.EndpointRelation;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
+import org.slf4j.*;
import static java.util.Objects.nonNull;
@@ -63,16 +54,20 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
private final List<SourceBuilder> entrySourceBuilders;
private final List<SourceBuilder> exitSourceBuilders;
+ private final List<DatabaseSlowStatement> slowDatabaseAccesses;
+ private final TraceServiceModuleConfig config;
private SpanDecorator entrySpanDecorator;
private long minuteTimeBucket;
- private MultiScopesSpanListener(ModuleManager moduleManager) {
+ private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.entrySourceBuilders = new LinkedList<>();
this.exitSourceBuilders = new LinkedList<>();
+ this.slowDatabaseAccesses = new ArrayList<>(10);
this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
+ this.config = config;
}
@Override public boolean containsPoint(Point point) {
@@ -152,6 +147,34 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceBuilder.setComponentId(spanDecorator.getComponentId());
setPublicAttrs(sourceBuilder, spanDecorator);
exitSourceBuilders.add(sourceBuilder);
+
+ if (sourceBuilder.getType().equals(RequestType.DATABASE)) {
+ boolean isSlowDBAccess = false;
+
+ DatabaseSlowStatement statement = new DatabaseSlowStatement();
+ statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
+ statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
+ statement.setLatency(sourceBuilder.getLatency());
+ statement.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime()));
+ statement.setTraceId(segmentCoreInfo.getSegmentId());
+ for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
+ if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
+ statement.setStatement(tag.getValue());
+
+ } else if (SpanTags.DB_TYPE.equals(tag.getKey())) {
+ String dbType = tag.getValue();
+ DBLatencyThresholds thresholds = config.getDbLatencyThresholds();
+ int threshold = thresholds.getThreshold(dbType);
+ if (sourceBuilder.getLatency() > threshold) {
+ isSlowDBAccess = true;
+ }
+ }
+ }
+
+ if (isSlowDBAccess) {
+ slowDatabaseAccesses.add(statement);
+ }
+ }
}
private void setPublicAttrs(SourceBuilder sourceBuilder, SpanDecorator spanDecorator) {
@@ -215,13 +238,15 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());
}
});
+
+ slowDatabaseAccesses.forEach(sourceReceiver::receive);
}
public static class Factory implements SpanListenerFactory {
@Override
- public SpanListener create(ModuleManager moduleManager) {
- return new MultiScopesSpanListener(moduleManager);
+ public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
+ return new MultiScopesSpanListener(moduleManager, config);
}
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 9add272..644d2d2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
@@ -144,7 +145,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
this.sampler = new TraceSegmentSampler(segmentSamplingRate);
}
- @Override public SpanListener create(ModuleManager moduleManager) {
+ @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new SegmentSpanListener(moduleManager, sampler);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
index 9e14954..338a1cd 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
@@ -81,7 +82,7 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
public static class Factory implements SpanListenerFactory {
- @Override public SpanListener create(ModuleManager moduleManager) {
+ @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new ServiceMappingSpanListener(moduleManager);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
index 344e049..9496815 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
@@ -113,9 +113,11 @@ class ServiceBMock {
span.setSpanLayer(SpanLayer.Database);
span.setParentSpanId(0);
span.setStartTime(startTimestamp + 550);
- span.setEndTime(startTimestamp + 1000);
+ span.setEndTime(startTimestamp + 1500);
span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
span.setIsError(true);
+ span.addTags(KeyWithStringValue.newBuilder().setKey("db.statement").setValue("select * from database where complex = 1;").build());
+ span.addTags(KeyWithStringValue.newBuilder().setKey("db.type").setValue("mongodb").build());
if (isPrepare) {
span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 746de62..840777d 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,6 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
#service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 5b53d73..28c7b97 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,6 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh:
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 8f31dba..cf10758 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -80,6 +80,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
new file mode 100644
index 0000000..d5f23d5
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+/**
+ * @author wusheng
+ */
+public class TopNRecordsQueryEsDAO extends EsDAO implements ITopNRecordsQueryDAO {
+ public TopNRecordsQueryEsDAO(ElasticSearchClient client) {
+ super(client);
+ }
+
+ @Override
+ public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+ int topN, Order order) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(TopN.SERVICE_ID, serviceId));
+
+ sourceBuilder.query(boolQueryBuilder);
+ sourceBuilder.size(topN).sort(TopN.LATENCY, order.equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
+ SearchResponse response = getClient().search(metricName, sourceBuilder);
+
+ List<TopNRecord> results = new ArrayList<>();
+
+ for (SearchHit searchHit : response.getHits().getHits()) {
+ TopNRecord record = new TopNRecord();
+ record.setStatement((String)searchHit.getSourceAsMap().get(TopN.STATEMENT));
+ record.setTraceId((String)searchHit.getSourceAsMap().get(TopN.TRACE_ID));
+ record.setLatency(((Number)searchHit.getSourceAsMap().get(TopN.LATENCY)).longValue());
+ results.add(record);
+ }
+
+ return results;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index a76fb49..addd6a0 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -86,6 +86,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
+ this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
new file mode 100644
index 0000000..2ab2231
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+
+/**
+ * @author wusheng
+ */
+public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
+ private JDBCHikariCPClient h2Client;
+
+ public H2TopNRecordsQueryDAO(JDBCHikariCPClient h2Client) {
+ this.h2Client = h2Client;
+ }
+
+ @Override
+ public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
+ int topN, Order order) throws IOException {
+ StringBuilder sql = new StringBuilder("select * from " + metricName + " where ");
+ List<Object> parameters = new ArrayList<>(10);
+
+ sql.append(" service_id = ? ");
+ parameters.add(serviceId);
+
+ sql.append(" and ").append(TopN.TIME_BUCKET).append(" >= ?");
+ parameters.add(startSecondTB);
+ sql.append(" and ").append(TopN.TIME_BUCKET).append(" <= ?");
+ parameters.add(endSecondTB);
+
+ sql.append(" order by ").append(TopN.LATENCY);
+ if (order.equals(Order.DES)) {
+ sql.append(" desc ");
+ } else {
+ sql.append(" asc ");
+ }
+ sql.append(" limit ").append(topN);
+
+ List<TopNRecord> results = new ArrayList<>();
+ try (Connection connection = h2Client.getConnection()) {
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), parameters.toArray(new Object[0]))) {
+ while (resultSet.next()) {
+ TopNRecord record = new TopNRecord();
+ record.setStatement(resultSet.getString(TopN.STATEMENT));
+ record.setTraceId(resultSet.getString(TopN.TRACE_ID));
+ record.setLatency(resultSet.getLong(TopN.LATENCY));
+ results.add(record);
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+
+ return results;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index fda7d31..6f2ac94 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -92,6 +92,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
+ this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/skywalking-ui b/skywalking-ui
index d5e46c0..ed64821 160000
--- a/skywalking-ui
+++ b/skywalking-ui
@@ -1 +1 @@
-Subproject commit d5e46c075cb30e3de42aeaaa9e839a758d3fc029
+Subproject commit ed64821de4fe1e524ec069660d18d7efb1b1061f