You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/09 16:28:44 UTC

[incubator-skywalking] branch top-sql updated: no message

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch top-sql
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/top-sql by this push:
     new 07ac45d  no message
07ac45d is described below

commit 07ac45dcbe41bedeb9eb659b2e0da3a3cf5b912f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 10 00:28:35 2019 +0800

    no message
---
 .../analysis/data/LimitedSizeDataCollection.java   | 30 ++++++++++++--------
 .../manual/database/TopNDatabaseStatement.java     | 18 ++++++++++--
 .../server/core/analysis/worker/TopNProcess.java   | 32 ++++++++++++++++++----
 .../server/core/analysis/worker/TopNWorker.java    |  4 +--
 .../oap/server/core/storage/PersistenceTimer.java  |  1 +
 5 files changed, 64 insertions(+), 21 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
index 2e03125..b330a4e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
@@ -19,17 +19,17 @@
 package org.apache.skywalking.oap.server.core.analysis.data;
 
 import java.util.*;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
 
 public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
 
-    private final LinkedList<STORAGE_DATA> data;
+    private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
     private final int limitedSize;
     private volatile boolean writing;
     private volatile boolean reading;
 
     LimitedSizeDataCollection(int limitedSize) {
-        this.data = new LinkedList<>();
+        this.data = new HashMap<>();
         this.writing = false;
         this.reading = false;
         this.limitedSize = limitedSize;
@@ -76,31 +76,39 @@ public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageDat
     }
 
     @Override public void put(STORAGE_DATA value) {
-        if (data.size() < limitedSize) {
-            data.add(value);
+        LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
+        if (storageDataList == null) {
+            storageDataList = new LinkedList<>();
+            data.put(value, storageDataList);
+        }
+
+        if (storageDataList.size() < limitedSize) {
+            storageDataList.add(value);
             return;
         }
 
-        for (int i = 0; i < data.size(); i++) {
-            STORAGE_DATA storageData = this.data.get(i);
+        for (int i = 0; i < storageDataList.size(); i++) {
+            STORAGE_DATA storageData = storageDataList.get(i);
             if (value.compareTo(storageData) <= 0) {
                 if (i == 0) {
                     // input value is less than the smallest in top N list, ignore
                 } else {
                     // Remove the smallest in top N list
                     // add the current value into the right position
-                    data.removeFirst();
-                    data.add(i, value);
+                    storageDataList.removeFirst();
+                    storageDataList.add(i, value);
                 }
                 return;
             }
         }
 
         // Add the value as biggest in top N list
-        data.addLast(value);
+        storageDataList.addLast(value);
     }
 
     @Override public Collection<STORAGE_DATA> collection() {
-        return data;
+        List<STORAGE_DATA> collection = new ArrayList<>();
+        data.values().forEach(e -> e.forEach(collection::add));
+        return collection;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index c528d02..50d82bc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.database;
 
 import java.util.*;
 import lombok.*;
-import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNType;
 import org.apache.skywalking.oap.server.core.source.Scope;
@@ -39,10 +39,22 @@ public class TopNDatabaseStatement extends TopN {
     public static final String DATABASE_SERVICE_ID = "db_service_id";
 
     @Getter @Setter @Column(columnName = DATABASE_SERVICE_ID) private int databaseServiceId;
-    private final long recordNanoSec = System.nanoTime();
 
     @Override public String id() {
-        return TIME_BUCKET + Const.ID_SPLIT + recordNanoSec;
+        throw new UnexpectedException("id() should not be called.");
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
+        return databaseServiceId == statement.databaseServiceId;
+    }
+
+    @Override public int hashCode() {
+        return Objects.hash(databaseServiceId);
     }
 
     public static class Builder implements StorageBuilder<TopNDatabaseStatement> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
index 98d8326..059c46e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
@@ -18,10 +18,15 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.util.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.manual.database.TopNDatabaseStatement;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
@@ -33,12 +38,29 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
 public enum TopNProcess {
     INSTANCE;
 
-    public void create(ModuleManager moduleManager, Class<? extends TopN> topN) {
-        String modelName = StorageEntityAnnotationUtils.getModelName(topN);
-        Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topN);
+    @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
+    private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
+
+    public void create(ModuleManager moduleManager, Class<? extends TopN> topNClass) {
+        String modelName = StorageEntityAnnotationUtils.getModelName(topNClass);
+        Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topNClass);
+
+        StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
+        IRecordDAO recordDAO;
+        try {
+            recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new UnexpectedException("");
+        }
+
+        TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager,
+            50, recordDAO);
+        WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+        persistentWorkers.add(persistentWorker);
+        workers.put(topNClass, persistentWorker);
     }
 
     public void in(TopNDatabaseStatement statement) {
-
+        workers.get(statement.getClass()).in(statement);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index a59080c..0dfbc0f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -39,10 +39,10 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
     private final String modelName;
     private final DataCarrier<TopN> dataCarrier;
 
-    public TopNWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
+    public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
         int topNSize,
         IRecordDAO recordDAO) {
-        super(moduleManager, workerId, batchSize);
+        super(moduleManager, workerId, -1);
         this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
         this.recordDAO = recordDAO;
         this.modelName = modelName;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 367bd9e..9bdbfd1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -84,6 +84,7 @@ public enum PersistenceTimer {
                 List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
                 persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
                 persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
+                persistenceWorkers.addAll(TopNProcess.INSTANCE.getPersistentWorkers());
 
                 persistenceWorkers.forEach(worker -> {
                     if (logger.isDebugEnabled()) {