You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/07/24 05:43:28 UTC
[skywalking] branch master updated: Feature of database session in
OAP server. (#3147)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 8d234a9 Feature of database session in OAP server. (#3147)
8d234a9 is described below
commit 8d234a91e55f4ddaed46ed885a81443f9f2fd0ce
Author: 彭勇升 pengys <pe...@apache.org>
AuthorDate: Wed Jul 24 13:43:19 2019 +0800
Feature of database session in OAP server. (#3147)
* Feature of database session
* Make it configurable.
* Make the OAP server can't startup.
---
.../oap/server/core/CoreModuleConfig.java | 1 +
.../oap/server/core/CoreModuleProvider.java | 75 +++++---------------
.../oap/server/core/analysis/metrics/Metrics.java | 1 +
.../server/core/analysis/metrics/PxxMetrics.java | 2 +-
.../analysis/worker/MetricsPersistentWorker.java | 81 ++++++++++++++++------
.../analysis/worker/MetricsStreamProcessor.java | 10 +--
.../core/analysis/worker/PersistenceWorker.java | 15 ++--
.../server/core/analysis/worker/TopNWorker.java | 12 +++-
.../oap/server/core/storage/IMetricsDAO.java | 4 +-
.../oap/server/core/storage/PersistenceTimer.java | 11 ++-
.../src/main/assembly/application.yml | 9 ++-
.../src/main/resources/application.yml | 3 +
.../plugin/elasticsearch/base/MetricsEsDAO.java | 13 ++--
.../storage/plugin/jdbc/h2/dao/H2MetricsDAO.java | 15 ++--
14 files changed, 133 insertions(+), 119 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 7d4c3ab..e47171a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -37,6 +37,7 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private int gRPCPort;
@Setter private int maxConcurrentCallsPerConnection;
@Setter private int maxMessageSize;
+ @Setter private boolean enableDatabaseSession;
private final List<String> downsampling;
/**
* The period of doing data persistence.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index f05b776..fa6c48a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,65 +20,25 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
-import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
+import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
-import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
-import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
-import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
-import org.apache.skywalking.oap.server.core.query.LogQueryService;
-import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
-import org.apache.skywalking.oap.server.core.query.MetricQueryService;
-import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
-import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
-import org.apache.skywalking.oap.server.core.query.TraceQueryService;
-import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
-import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import org.apache.skywalking.oap.server.core.remote.client.Address;
-import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.oal.rt.*;
+import org.apache.skywalking.oap.server.core.query.*;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
-import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
-import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
-import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
-import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
-import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
@@ -96,7 +56,6 @@ public class CoreModuleProvider extends ModuleProvider {
private final AnnotationScan annotationScan;
private final StorageModels storageModels;
private final SourceReceiverImpl receiver;
- private StreamAnnotationListener streamAnnotationListener;
private OALEngine oalEngine;
public CoreModuleProvider() {
@@ -120,7 +79,7 @@ public class CoreModuleProvider extends ModuleProvider {
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- streamAnnotationListener = new StreamAnnotationListener(getManager());
+ StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(getManager());
AnnotationScan scopeScan = new AnnotationScan();
scopeScan.registerListener(new DefaultScopeDefine.Listener());
@@ -200,6 +159,8 @@ public class CoreModuleProvider extends ModuleProvider {
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
+
+ MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
}
@Override public void start() throws ModuleStartException {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
index 96b59da..25c3125 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
@@ -35,6 +35,7 @@ public abstract class Metrics extends StreamData implements StorageData {
public static final String ENTITY_ID = "entity_id";
@Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
+ @Getter @Setter private long survivalTime = 0L;
public abstract String id();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java
index 1cc88c2..3754c62 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java
@@ -43,7 +43,7 @@ public abstract class PxxMetrics extends Metrics implements IntValueHolder {
@Getter @Setter @Column(columnName = DETAIL_GROUP) private IntKeyLongValueArray detailGroup;
private final int percentileRank;
- private Map<Integer, IntKeyLongValue> detailIndex;
+ @Getter private Map<Integer, IntKeyLongValue> detailIndex;
public PxxMetrics(int percentileRank) {
this.percentileRank = percentileRank;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 34e6436..8c2865a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
+import java.io.IOException;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
@@ -40,16 +41,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
private static final Logger logger = LoggerFactory.getLogger(MetricsPersistentWorker.class);
private final Model model;
+ private final Map<Metrics, Metrics> databaseSession;
private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final AbstractWorker<Metrics> nextAlarmWorker;
private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
+ private final boolean enableDatabaseSession;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
- AbstractWorker<ExportEvent> nextExportWorker) {
+ AbstractWorker<ExportEvent> nextExportWorker, boolean enableDatabaseSession) {
super(moduleDefineHolder);
this.model = model;
+ this.databaseSession = new HashMap<>(100);
+ this.enableDatabaseSession = enableDatabaseSession;
this.mergeDataCache = new MergeDataCache<>();
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = nextAlarmWorker;
@@ -83,23 +88,21 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
return mergeDataCache;
}
- @Override public void prepareBatch(MergeDataCache<Metrics> cache, List<PrepareRequest> prepareRequests) {
+ @Override public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
long start = System.currentTimeMillis();
- Collection<Metrics> collection = cache.getLast().collection();
-
int i = 0;
+ int batchGetSize = 2000;
Metrics[] metrics = null;
- for (Metrics data : collection) {
+ for (Metrics data : lastCollection) {
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
nextExportWorker.in(event);
}
- int batchGetSize = 2000;
int mod = i % batchGetSize;
if (mod == 0) {
- int residual = collection.size() - i;
+ int residual = lastCollection.size() - i;
if (residual >= batchGetSize) {
metrics = new Metrics[batchGetSize];
} else {
@@ -110,23 +113,18 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
if (mod == metrics.length - 1) {
try {
- Map<String, Metrics> dbMetricsMap = metricsDAO.get(model, metrics);
+ syncStorageToCache(metrics);
for (Metrics metric : metrics) {
- if (dbMetricsMap.containsKey(metric.id())) {
- metric.combine(dbMetricsMap.get(metric.id()));
- metric.calculate();
- prepareRequests.add(metricsDAO.prepareBatchUpdate(model, metric));
+ Metrics cacheMetric = databaseSession.get(metric);
+ if (cacheMetric != null) {
+ cacheMetric.combine(metric);
+ cacheMetric.calculate();
+ prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cacheMetric));
+ nextWorker(cacheMetric);
} else {
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric));
- }
-
- if (Objects.nonNull(nextAlarmWorker)) {
- nextAlarmWorker.in(metric);
- }
- if (Objects.nonNull(nextExportWorker)) {
- ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
- nextExportWorker.in(event);
+ nextWorker(metric);
}
}
} catch (Throwable t) {
@@ -142,6 +140,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
}
+ private void nextWorker(Metrics metric) {
+ if (Objects.nonNull(nextAlarmWorker)) {
+ nextAlarmWorker.in(metric);
+ }
+ if (Objects.nonNull(nextExportWorker)) {
+ ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
+ nextExportWorker.in(event);
+ }
+ }
+
@Override public void cacheData(Metrics input) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(input)) {
@@ -156,6 +164,39 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
mergeDataCache.finishWriting();
}
+ private void syncStorageToCache(Metrics[] metrics) throws IOException {
+ if (!enableDatabaseSession) {
+ databaseSession.clear();
+ }
+
+ List<String> notInCacheIds = new ArrayList<>();
+ for (Metrics metric : metrics) {
+ if (!databaseSession.containsKey(metric)) {
+ notInCacheIds.add(metric.id());
+ }
+ }
+
+ if (notInCacheIds.size() > 0) {
+ List<Metrics> metricsList = metricsDAO.multiGet(model, notInCacheIds);
+ for (Metrics metric : metricsList) {
+ databaseSession.put(metric, metric);
+ }
+ }
+ }
+
+ @Override public void endOfRound(long tookTime) {
+ if (enableDatabaseSession) {
+ Iterator<Metrics> iterator = databaseSession.values().iterator();
+ while (iterator.hasNext()) {
+ Metrics metrics = iterator.next();
+ metrics.setSurvivalTime(tookTime + metrics.getSurvivalTime());
+ if (metrics.getSurvivalTime() > 70000) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
private class PersistentConsumer implements IConsumer<Metrics> {
private final MetricsPersistentWorker persistent;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index e81906e..4ee60ab 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
-import lombok.Getter;
+import lombok.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -39,6 +39,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
@Getter private List<MetricsPersistentWorker> persistentWorkers = new ArrayList<>();
+ @Setter @Getter private boolean enableDatabaseSession;
public static MetricsStreamProcessor getInstance() {
return PROCESSOR;
@@ -100,19 +101,18 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
entryWorkers.put(metricsClass, aggregateWorker);
}
- private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder,
- IMetricsDAO metricsDAO, Model model) {
+ private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
- MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker);
+ MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, enableDatabaseSession);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
- MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null);
+ MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, enableDatabaseSession);
persistentWorkers.add(persistentWorker);
return persistentWorker;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 166eb9c..88c9a5b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -18,8 +18,8 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.data.Window;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
@@ -45,6 +45,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
public abstract CACHE getCache();
+ public abstract void endOfRound(long tookTime);
+
public boolean flushAndSwitch() {
boolean isSwitch;
try {
@@ -57,11 +59,12 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
return isSwitch;
}
- public abstract void prepareBatch(CACHE cache, List<PrepareRequest> prepareRequests);
+ public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests);
public final void buildBatchRequests(List<PrepareRequest> prepareRequests) {
try {
- while (getCache().getLast().isWriting()) {
+ SWCollection<INPUT> last = getCache().getLast();
+ while (last.isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -69,8 +72,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
}
}
- if (getCache().getLast().collection() != null) {
- prepareBatch(getCache(), prepareRequests);
+ if (last.collection() != null) {
+ prepareBatch(last.collection(), prepareRequests);
}
} finally {
getCache().finishReadingLast();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 712e992..8ccde84 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.List;
+import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
@@ -86,8 +86,8 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
return super.flushAndSwitch();
}
- @Override public void prepareBatch(LimitedSizeDataCache<TopN> cache, List<PrepareRequest> prepareRequests) {
- cache.getLast().collection().forEach(record -> {
+ @Override public void prepareBatch(Collection<TopN> lastCollection, List<PrepareRequest> prepareRequests) {
+ lastCollection.forEach(record -> {
try {
prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
} catch (Throwable t) {
@@ -96,6 +96,12 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
});
}
+ /**
+ * This method used to clear the expired cache, but TopN is not following it.
+ */
+ @Override public void endOfRound(long tookTime) {
+ }
+
@Override public void in(TopN n) {
dataCarrier.produce(n);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index e67dfa8..f0ff7bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
-import java.util.Map;
+import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.*;
@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.library.client.request.*;
*/
public interface IMetricsDAO extends DAO {
- Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException;
+ List<Metrics> multiGet(Model model, List<String> ids) throws IOException;
InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index cedd074..5a18716 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -43,6 +43,8 @@ public enum PersistenceTimer {
private CounterMetrics errorCounter;
private HistogramMetrics prepareLatency;
private HistogramMetrics executeLatency;
+ private long lastTime = System.currentTimeMillis();
+ private final List<PrepareRequest> prepareRequests = new ArrayList<>(50000);
PersistenceTimer() {
this.debug = System.getProperty("debug") != null;
@@ -61,7 +63,7 @@ public enum PersistenceTimer {
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
if (!isStarted) {
- Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+ Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
t -> logger.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
@@ -75,10 +77,10 @@ public enum PersistenceTimer {
}
long startTime = System.currentTimeMillis();
+
try {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
- List<PrepareRequest> prepareRequests = new LinkedList<>();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
@@ -92,6 +94,8 @@ public enum PersistenceTimer {
if (worker.flushAndSwitch()) {
worker.buildBatchRequests(prepareRequests);
}
+
+ worker.endOfRound(System.currentTimeMillis() - lastTime);
});
if (debug) {
@@ -116,6 +120,9 @@ public enum PersistenceTimer {
if (logger.isDebugEnabled()) {
logger.debug("Persistence data save finish");
}
+
+ prepareRequests.clear();
+ lastTime = System.currentTimeMillis();
}
if (debug) {
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index bc4b745..e45666c 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -52,9 +52,9 @@ core:
gRPCHost: ${SW_CORE_GRPC_HOST:0.0.0.0}
gRPCPort: ${SW_CORE_GRPC_PORT:11800}
downsampling:
- - Hour
- - Day
- - Month
+ - Hour
+ - Day
+ - Month
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
@@ -62,6 +62,9 @@ core:
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
+ # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
+ # the metrics may not be accurate within that minute.
+ enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
storage:
# elasticsearch:
# nameSpace: ${SW_NAMESPACE:""}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index ec79843..30d2134 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -62,6 +62,9 @@ core:
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
+ # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
+ # the metrics may not be accurate within that minute.
+ enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
storage:
elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index e0403c7..7061531 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -39,18 +39,13 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
this.storageBuilder = storageBuilder;
}
- @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
- Map<String, Metrics> result = new HashMap<>();
+ @Override public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
+ SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0]));
- String[] ids = new String[metrics.length];
- for (int i = 0; i < metrics.length; i++) {
- ids[i] = metrics[i].id();
- }
-
- SearchResponse response = getClient().ids(model.getName(), ids);
+ List<Metrics> result = new ArrayList<>((int)response.getHits().totalHits);
for (int i = 0; i < response.getHits().totalHits; i++) {
Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap());
- result.put(source.id(), source);
+ result.add(source);
}
return result;
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
index 7f18b4c..e071c51 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
@@ -39,18 +39,11 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO {
this.storageBuilder = storageBuilder;
}
- @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
- Map<String, Metrics> result = new HashMap<>();
-
- String[] ids = new String[metrics.length];
- for (int i = 0; i < metrics.length; i++) {
- ids[i] = metrics[i].id();
- }
-
- List<StorageData> storageDataList = getByIDs(h2Client, model.getName(), ids, storageBuilder);
-
+ @Override public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
+ List<StorageData> storageDataList = getByIDs(h2Client, model.getName(), ids.toArray(new String[0]), storageBuilder);
+ List<Metrics> result = new ArrayList<>(storageDataList.size());
for (StorageData storageData : storageDataList) {
- result.put(storageData.id(), (Metrics)storageData);
+ result.add((Metrics)storageData);
}
return result;
}