You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/10 14:27:52 UTC

[incubator-skywalking] branch top-sql updated: Finish the top n database statement persistent.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch top-sql
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/top-sql by this push:
     new 4f2a82a  Finish the top n database statement persistent.
4f2a82a is described below

commit 4f2a82acfafe85908c3144c80d7736c973e435cf
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 10 22:27:39 2019 +0800

    Finish the top n database statement persistent.
---
 .../core/analysis/data/LimitedSizeDataCache.java   | 12 ++++++-----
 .../analysis/data/LimitedSizeDataCollection.java   |  1 +
 .../oap/server/core/analysis/data/Window.java      | 10 +++++++++
 .../database/DatabaseStatementDispatcher.java      |  1 +
 .../manual/database/TopNDatabaseStatement.java     |  5 +++--
 .../server/core/analysis/worker/TopNWorker.java    | 24 ++++++++++++++++++----
 .../server/core/source/DatabaseSlowStatement.java  |  1 +
 .../listener/endpoint/MultiScopesSpanListener.java |  5 ++++-
 8 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
index d64d3e7..33cfe1d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
@@ -22,11 +22,13 @@ import org.apache.skywalking.oap.server.core.storage.*;
 
 public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
 
-    private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
+    private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
     private final int limitSize;
 
     public LimitedSizeDataCache(int limitSize) {
+        super(false);
         this.limitSize = limitSize;
+        init();
     }
 
     @Override public SWCollection<STORAGE_DATA> collectionInstance() {
@@ -34,16 +36,16 @@ public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> ex
     }
 
     public void add(STORAGE_DATA data) {
-        lockedMergeDataCollection.put(data);
+        limitedSizeDataCollection.put(data);
     }
 
     @Override public void writing() {
-        lockedMergeDataCollection = getCurrentAndWriting();
+        limitedSizeDataCollection = getCurrentAndWriting();
     }
 
     @Override public void finishWriting() {
-        lockedMergeDataCollection.finishWriting();
-        lockedMergeDataCollection = null;
+        limitedSizeDataCollection.finishWriting();
+        limitedSizeDataCollection = null;
     }
 }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
index b330a4e..70b7966 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
@@ -104,6 +104,7 @@ public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageDat
 
         // Add the value as biggest in top N list
         storageDataList.addLast(value);
+        storageDataList.removeFirst();
     }
 
     @Override public Collection<STORAGE_DATA> collection() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
index 6f9486d..ff2ca6d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
@@ -33,6 +33,16 @@ public abstract class Window<DATA> {
     private SWCollection<DATA> windowDataB;
 
     Window() {
+        this(true);
+    }
+
+    Window(boolean autoInit) {
+        if (autoInit) {
+            init();
+        }
+    }
+
+    protected void init() {
         this.windowDataA = collectionInstance();
         this.windowDataB = collectionInstance();
         this.pointer = windowDataA;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
index a94d370..628751a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
 public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
     @Override public void dispatch(DatabaseSlowStatement source) {
         TopNDatabaseStatement statement = new TopNDatabaseStatement();
+        statement.setId(source.getId());
         statement.setDatabaseServiceId(source.getDatabaseServiceId());
         statement.setLatency(source.getLatency());
         statement.setStatement(source.getStatement());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index 3639ae7..c40a6bb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -35,13 +35,14 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
 @TopNType
 @StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, source = Scope.DatabaseSlowStatement)
 public class TopNDatabaseStatement extends TopN {
-    public static final String INDEX_NAME = "TOP_N_DATABASE_STATEMENT";
+    public static final String INDEX_NAME = "top_n_database_statement";
     public static final String DATABASE_SERVICE_ID = "db_service_id";
 
+    @Setter private String id;
     @Getter @Setter @Column(columnName = DATABASE_SERVICE_ID) private int databaseServiceId;
 
     @Override public String id() {
-        throw new UnexpectedException("id() should not be called.");
+        return id;
     }
 
     @Override public boolean equals(Object o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 6ccccd8..5d6304e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -56,7 +56,12 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
     }
 
     @Override void onWork(TopN data) {
-        limitedSizeDataCache.add(data);
+        limitedSizeDataCache.writing();
+        try {
+            limitedSizeDataCache.add(data);
+        } finally {
+            limitedSizeDataCache.finishWriting();
+        }
     }
 
     /**
@@ -72,13 +77,24 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
         return limitedSizeDataCache;
     }
 
-    @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
+    /**
+     * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
+     * time windows.
+     *
+     * Switch and persistent attempt happens based on reportCycle.
+     *
+     * @return
+     */
+    @Override public boolean flushAndSwitch() {
         long now = System.currentTimeMillis();
         if (now - lastReportTimestamp <= reportCycle) {
-            return new ArrayList<>(0);
+            return false;
         }
         lastReportTimestamp = now;
+        return super.flushAndSwitch();
+    }
 
+    @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
         List<Object> batchCollection = new LinkedList<>();
         cache.getLast().collection().forEach(record -> {
             try {
@@ -91,7 +107,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
     }
 
     @Override public void in(TopN n) {
-        limitedSizeDataCache.add(n);
+        dataCarrier.produce(n);
     }
 
     private class TopNConsumer implements IConsumer<TopN> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index c3a7a68..254a5e7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
  */
 @SourceType
 public class DatabaseSlowStatement extends Source {
+    @Getter @Setter private String id;
     @Getter @Setter private int databaseServiceId;
     @Getter @Setter private String statement;
     @Getter @Setter private long latency;
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index ebb3468..4b970fe 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
 import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
@@ -147,11 +148,13 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
         setPublicAttrs(sourceBuilder, spanDecorator);
         exitSourceBuilders.add(sourceBuilder);
 
-        if (spanDecorator.getSpanLayer().equals(RequestType.DATABASE)
+        if (sourceBuilder.getType().equals(RequestType.DATABASE)
             && sourceBuilder.getLatency() > config.getSlowDBAccessThreshold()) {
             DatabaseSlowStatement statement = new DatabaseSlowStatement();
+            statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
             statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
             statement.setLatency(sourceBuilder.getLatency());
+            statement.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime()));
             statement.setTraceId(segmentCoreInfo.getSegmentId());
             for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
                 if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {