You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/10 14:27:52 UTC
[incubator-skywalking] branch top-sql updated: Finish the top n
database statement persistent.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch top-sql
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/top-sql by this push:
new 4f2a82a Finish the top n database statement persistent.
4f2a82a is described below
commit 4f2a82acfafe85908c3144c80d7736c973e435cf
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 10 22:27:39 2019 +0800
Finish the top n database statement persistent.
---
.../core/analysis/data/LimitedSizeDataCache.java | 12 ++++++-----
.../analysis/data/LimitedSizeDataCollection.java | 1 +
.../oap/server/core/analysis/data/Window.java | 10 +++++++++
.../database/DatabaseStatementDispatcher.java | 1 +
.../manual/database/TopNDatabaseStatement.java | 5 +++--
.../server/core/analysis/worker/TopNWorker.java | 24 ++++++++++++++++++----
.../server/core/source/DatabaseSlowStatement.java | 1 +
.../listener/endpoint/MultiScopesSpanListener.java | 5 ++++-
8 files changed, 47 insertions(+), 12 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
index d64d3e7..33cfe1d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
@@ -22,11 +22,13 @@ import org.apache.skywalking.oap.server.core.storage.*;
public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
- private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
+ private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
private final int limitSize;
public LimitedSizeDataCache(int limitSize) {
+ super(false);
this.limitSize = limitSize;
+ init();
}
@Override public SWCollection<STORAGE_DATA> collectionInstance() {
@@ -34,16 +36,16 @@ public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> ex
}
public void add(STORAGE_DATA data) {
- lockedMergeDataCollection.put(data);
+ limitedSizeDataCollection.put(data);
}
@Override public void writing() {
- lockedMergeDataCollection = getCurrentAndWriting();
+ limitedSizeDataCollection = getCurrentAndWriting();
}
@Override public void finishWriting() {
- lockedMergeDataCollection.finishWriting();
- lockedMergeDataCollection = null;
+ limitedSizeDataCollection.finishWriting();
+ limitedSizeDataCollection = null;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
index b330a4e..70b7966 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
@@ -104,6 +104,7 @@ public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageDat
// Add the value as biggest in top N list
storageDataList.addLast(value);
+ storageDataList.removeFirst();
}
@Override public Collection<STORAGE_DATA> collection() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
index 6f9486d..ff2ca6d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
@@ -33,6 +33,16 @@ public abstract class Window<DATA> {
private SWCollection<DATA> windowDataB;
Window() {
+ this(true);
+ }
+
+ Window(boolean autoInit) {
+ if (autoInit) {
+ init();
+ }
+ }
+
+ protected void init() {
this.windowDataA = collectionInstance();
this.windowDataB = collectionInstance();
this.pointer = windowDataA;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
index a94d370..628751a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
@Override public void dispatch(DatabaseSlowStatement source) {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
+ statement.setId(source.getId());
statement.setDatabaseServiceId(source.getDatabaseServiceId());
statement.setLatency(source.getLatency());
statement.setStatement(source.getStatement());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index 3639ae7..c40a6bb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -35,13 +35,14 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
@TopNType
@StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, source = Scope.DatabaseSlowStatement)
public class TopNDatabaseStatement extends TopN {
- public static final String INDEX_NAME = "TOP_N_DATABASE_STATEMENT";
+ public static final String INDEX_NAME = "top_n_database_statement";
public static final String DATABASE_SERVICE_ID = "db_service_id";
+ @Setter private String id;
@Getter @Setter @Column(columnName = DATABASE_SERVICE_ID) private int databaseServiceId;
@Override public String id() {
- throw new UnexpectedException("id() should not be called.");
+ return id;
}
@Override public boolean equals(Object o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 6ccccd8..5d6304e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -56,7 +56,12 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
}
@Override void onWork(TopN data) {
- limitedSizeDataCache.add(data);
+ limitedSizeDataCache.writing();
+ try {
+ limitedSizeDataCache.add(data);
+ } finally {
+ limitedSizeDataCache.finishWriting();
+ }
}
/**
@@ -72,13 +77,24 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
return limitedSizeDataCache;
}
- @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
+ /**
+ * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
+ * time windows.
+ *
+ * Switch and persistent attempt happens based on reportCycle.
+ *
+ * @return
+ */
+ @Override public boolean flushAndSwitch() {
long now = System.currentTimeMillis();
if (now - lastReportTimestamp <= reportCycle) {
- return new ArrayList<>(0);
+ return false;
}
lastReportTimestamp = now;
+ return super.flushAndSwitch();
+ }
+ @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(record -> {
try {
@@ -91,7 +107,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
}
@Override public void in(TopN n) {
- limitedSizeDataCache.add(n);
+ dataCarrier.produce(n);
}
private class TopNConsumer implements IConsumer<TopN> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index c3a7a68..254a5e7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
*/
@SourceType
public class DatabaseSlowStatement extends Source {
+ @Getter @Setter private String id;
@Getter @Setter private int databaseServiceId;
@Getter @Setter private String statement;
@Getter @Setter private long latency;
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index ebb3468..4b970fe 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
@@ -147,11 +148,13 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
setPublicAttrs(sourceBuilder, spanDecorator);
exitSourceBuilders.add(sourceBuilder);
- if (spanDecorator.getSpanLayer().equals(RequestType.DATABASE)
+ if (sourceBuilder.getType().equals(RequestType.DATABASE)
&& sourceBuilder.getLatency() > config.getSlowDBAccessThreshold()) {
DatabaseSlowStatement statement = new DatabaseSlowStatement();
+ statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
statement.setLatency(sourceBuilder.getLatency());
+ statement.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime()));
statement.setTraceId(segmentCoreInfo.getSegmentId());
for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {