You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/09 16:28:44 UTC
[incubator-skywalking] branch top-sql updated: no message
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch top-sql
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/top-sql by this push:
new 07ac45d no message
07ac45d is described below
commit 07ac45dcbe41bedeb9eb659b2e0da3a3cf5b912f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 10 00:28:35 2019 +0800
no message
---
.../analysis/data/LimitedSizeDataCollection.java | 30 ++++++++++++--------
.../manual/database/TopNDatabaseStatement.java | 18 ++++++++++--
.../server/core/analysis/worker/TopNProcess.java | 32 ++++++++++++++++++----
.../server/core/analysis/worker/TopNWorker.java | 4 +--
.../oap/server/core/storage/PersistenceTimer.java | 1 +
5 files changed, 64 insertions(+), 21 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
index 2e03125..b330a4e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
@@ -19,17 +19,17 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.*;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
- private final LinkedList<STORAGE_DATA> data;
+ private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
private final int limitedSize;
private volatile boolean writing;
private volatile boolean reading;
LimitedSizeDataCollection(int limitedSize) {
- this.data = new LinkedList<>();
+ this.data = new HashMap<>();
this.writing = false;
this.reading = false;
this.limitedSize = limitedSize;
@@ -76,31 +76,39 @@ public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageDat
}
@Override public void put(STORAGE_DATA value) {
- if (data.size() < limitedSize) {
- data.add(value);
+ LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
+ if (storageDataList == null) {
+ storageDataList = new LinkedList<>();
+ data.put(value, storageDataList);
+ }
+
+ if (storageDataList.size() < limitedSize) {
+ storageDataList.add(value);
return;
}
- for (int i = 0; i < data.size(); i++) {
- STORAGE_DATA storageData = this.data.get(i);
+ for (int i = 0; i < storageDataList.size(); i++) {
+ STORAGE_DATA storageData = storageDataList.get(i);
if (value.compareTo(storageData) <= 0) {
if (i == 0) {
// input value is less than the smallest in top N list, ignore
} else {
// Remove the smallest in top N list
// add the current value into the right position
- data.removeFirst();
- data.add(i, value);
+ storageDataList.removeFirst();
+ storageDataList.add(i, value);
}
return;
}
}
// Add the value as biggest in top N list
- data.addLast(value);
+ storageDataList.addLast(value);
}
@Override public Collection<STORAGE_DATA> collection() {
- return data;
+ List<STORAGE_DATA> collection = new ArrayList<>();
+ data.values().forEach(e -> e.forEach(collection::add));
+ return collection;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index c528d02..50d82bc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.database;
import java.util.*;
import lombok.*;
-import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNType;
import org.apache.skywalking.oap.server.core.source.Scope;
@@ -39,10 +39,22 @@ public class TopNDatabaseStatement extends TopN {
public static final String DATABASE_SERVICE_ID = "db_service_id";
@Getter @Setter @Column(columnName = DATABASE_SERVICE_ID) private int databaseServiceId;
- private final long recordNanoSec = System.nanoTime();
@Override public String id() {
- return TIME_BUCKET + Const.ID_SPLIT + recordNanoSec;
+ throw new UnexpectedException("id() should not be called.");
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
+ return databaseServiceId == statement.databaseServiceId;
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(databaseServiceId);
}
public static class Builder implements StorageBuilder<TopNDatabaseStatement> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
index 98d8326..059c46e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
@@ -18,10 +18,15 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
+import java.util.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.database.TopNDatabaseStatement;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
@@ -33,12 +38,29 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
public enum TopNProcess {
INSTANCE;
- public void create(ModuleManager moduleManager, Class<? extends TopN> topN) {
- String modelName = StorageEntityAnnotationUtils.getModelName(topN);
- Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topN);
+ @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
+ private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
+
+ public void create(ModuleManager moduleManager, Class<? extends TopN> topNClass) {
+ String modelName = StorageEntityAnnotationUtils.getModelName(topNClass);
+ Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topNClass);
+
+ StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
+ IRecordDAO recordDAO;
+ try {
+ recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new UnexpectedException("");
+ }
+
+ TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager,
+ 50, recordDAO);
+ WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ persistentWorkers.add(persistentWorker);
+ workers.put(topNClass, persistentWorker);
}
public void in(TopNDatabaseStatement statement) {
-
+ workers.get(statement.getClass()).in(statement);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index a59080c..0dfbc0f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -39,10 +39,10 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
private final String modelName;
private final DataCarrier<TopN> dataCarrier;
- public TopNWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
+ public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
int topNSize,
IRecordDAO recordDAO) {
- super(moduleManager, workerId, batchSize);
+ super(moduleManager, workerId, -1);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO;
this.modelName = modelName;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 367bd9e..9bdbfd1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -84,6 +84,7 @@ public enum PersistenceTimer {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
+ persistenceWorkers.addAll(TopNProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {