You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/02 08:57:04 UTC
[skywalking] 01/01: Reduce the buffer size(queue) of MAL(only) metric streams
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit f83f05ec5707e922fce5fb56b18727391621ef50
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Nov 2 16:56:45 2022 +0800
Reduce the buffer size(queue) of MAL(only) metric streams
---
docs/en/changes/changes.md | 1 +
.../oap/server/core/analysis/StreamProcessor.java | 5 ---
.../analysis/worker/ManagementStreamProcessor.java | 1 -
.../core/analysis/worker/MetricStreamKind.java | 41 ++++++++++++++++++++++
.../analysis/worker/MetricsAggregateWorker.java | 19 ++++++++--
.../analysis/worker/MetricsPersistentWorker.java | 16 ++++++---
.../analysis/worker/MetricsStreamProcessor.java | 39 ++++++++++++--------
.../core/analysis/worker/NoneStreamProcessor.java | 1 -
.../analysis/worker/RecordStreamProcessor.java | 2 --
.../core/analysis/worker/TopNStreamProcessor.java | 2 --
10 files changed, 95 insertions(+), 32 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index fa9fae0400..6c024b1da2 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -82,6 +82,7 @@
* Support Python runtime metrics analysis.
* Support `sampledTrace` in LAL.
* Support multiple rules with different names under the same layer of LAL script.
+* (Optimization) Reduce the buffer size(queue) of MAL(only) metric streams. Set L1 queue size as 1/20, L2 queue size as 1/2.
#### UI
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java
index 5a57f4c3e5..c7b6aec98d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java
@@ -18,12 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-
public interface StreamProcessor<STREAM> {
void in(STREAM stream);
-
- void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends STREAM> streamClass) throws StorageException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
index fb5ceec557..f48157a9c2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
@@ -60,7 +60,6 @@ public class ManagementStreamProcessor implements StreamProcessor<ManagementData
}
}
- @Override
public void create(final ModuleDefineHolder moduleDefineHolder, final Stream stream, final Class<? extends ManagementData> streamClass) throws StorageException {
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricStreamKind.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricStreamKind.java
new file mode 100644
index 0000000000..b9697b4e9f
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricStreamKind.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+/**
+ * MetricStreamKind represents the kind of metric character.
+ */
+public enum MetricStreamKind {
+ /**
+ * Metric built by {@link org.apache.skywalking.oap.server.core.oal.rt.OALEngine}
+ *
+ * OAL is SkyWalking native metrics from SkyWalking native analyzers, for traces and service mesh logs.
+ * The {@link org.apache.skywalking.oap.server.core.source.Source} implementations represent the raw traffic.
+ *
+ * The significant different between OAL and {@link #MAL} type is, the traffic load of OAL metrics is much more.
+ * So,in the stream process, kernel assigned larger buffer and more resources for this kind.
+ */
+ OAL,
+ /**
+ * Metric built by {@link org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem}
+ *
+ * MAL metrics are from existing metric system, such as SkyWalking meter, Prometheus, OpenTelemetry
+ */
+ MAL
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 74597a24ba..046029cff4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -50,13 +50,26 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private CounterMetrics aggregationCounter;
private long lastSendTime = 0;
- MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
- String modelName, long l1FlushPeriod) {
+ MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder,
+ AbstractWorker<Metrics> nextWorker,
+ String modelName,
+ long l1FlushPeriod,
+ MetricStreamKind kind) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergableBufferedData();
String name = "METRICS_L1_AGGREGATION";
- this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
+ int queueChannelSize = 2;
+ int queueBufferSize = 10_000;
+ if (MetricStreamKind.MAL == kind) {
+ // In MAL meter streaming, the load of data flow is much less as they are statistics already,
+ // but in OAL sources, they are raw data.
+ // Set the buffer(size of queue) as 1/20 to reduce unnecessary resource costs.
+ queueChannelSize = 1;
+ queueBufferSize = 1_000;
+ }
+ this.dataCarrier = new DataCarrier<>(
+ "MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize);
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(
name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index ccab0aba0f..4725f749e2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -86,7 +86,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate,
- long storageSessionTimeout, int metricsDataTTL) {
+ long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model;
this.context = new HashMap<>(100);
@@ -113,7 +113,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
throw new UnexpectedException(e.getMessage(), e);
}
- this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000);
+ int bufferSize = 2000;
+ if (MetricStreamKind.MAL == kind) {
+ // In MAL meter streaming, the load of data flow is much less as they are statistics already,
+ // but in OAL sources, they are raw data.
+ // Set the buffer(size of queue) as 1/2 to reduce unnecessary resource costs.
+ bufferSize = 1000;
+ }
+ this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, bufferSize);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
@@ -136,10 +143,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
boolean enableDatabaseSession,
boolean supportUpdate,
long storageSessionTimeout,
- int metricsDataTTL) {
+ int metricsDataTTL,
+ MetricStreamKind kind) {
this(moduleDefineHolder, model, metricsDAO,
null, null, null,
- enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
+ enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
);
// For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes.
// And add offset according to worker creation sequence, to avoid context clear overlap,
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 7885c0738e..7b849f88b1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -106,23 +106,31 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
}
/**
- * Create the workers and work flow for every metrics.
+ * Create the workers and work flow for OAL metrics.
*
* @param moduleDefineHolder pointer of the module define.
* @param stream definition of the metrics class.
* @param metricsClass data type of the streaming calculation.
*/
- @Override
public void create(ModuleDefineHolder moduleDefineHolder,
Stream stream,
Class<? extends Metrics> metricsClass) throws StorageException {
- this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
+ this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass, MetricStreamKind.OAL);
}
- @SuppressWarnings("unchecked")
+ /**
+ * Create the workers and work flow for MAL meter
+ */
public void create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition stream,
- Class<? extends Metrics> metricsClass) throws StorageException {
+ Class<? extends Metrics> meterClass) throws StorageException {
+ this.create(moduleDefineHolder, stream, meterClass, MetricStreamKind.MAL);
+ }
+
+ private void create(ModuleDefineHolder moduleDefineHolder,
+ StreamDefinition stream,
+ Class<? extends Metrics> metricsClass,
+ MetricStreamKind kind) throws StorageException {
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
@@ -133,7 +141,8 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
IMetricsDAO metricsDAO;
try {
metricsDAO = storageDAO.newMetricsDao(builder.getDeclaredConstructor().newInstance());
- } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+ } catch (InstantiationException | IllegalAccessException | NoSuchMethodException |
+ InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
}
@@ -165,14 +174,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
false
);
- hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
+ hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate, kind);
}
if (configService.shouldToDay()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day),
false
);
- dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
+ dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate, kind);
}
transWorker = new MetricsTransWorker(
@@ -184,7 +193,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
false
);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
- moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
+ moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate, kind);
String remoteReceiverWorkerName = stream.getName() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
@@ -194,7 +203,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
- moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);
+ moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, kind);
entryWorkers.put(metricsClass, aggregateWorker);
}
@@ -203,13 +212,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
IMetricsDAO metricsDAO,
Model model,
MetricsTransWorker transWorker,
- boolean supportUpdate) {
+ boolean supportUpdate,
+ MetricStreamKind kind) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportMetricsWorker exportWorker = new ExportMetricsWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker,
- enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
+ enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
);
persistentWorkers.add(minutePersistentWorker);
@@ -219,10 +229,11 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
private MetricsPersistentWorker downSamplingWorker(ModuleDefineHolder moduleDefineHolder,
IMetricsDAO metricsDAO,
Model model,
- boolean supportUpdate) {
+ boolean supportUpdate,
+ MetricStreamKind kind) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO,
- enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
+ enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
);
persistentWorkers.add(persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
index 7b53ad2ee9..ffb31ded42 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
@@ -60,7 +60,6 @@ public class NoneStreamProcessor implements StreamProcessor<NoneStream> {
}
}
- @Override
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends NoneStream> streamClass) throws StorageException {
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 893152794a..3d081a8bb5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -56,8 +56,6 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
}
}
- @Override
- @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 4ba5496114..dc6efcd5b2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -71,8 +71,6 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
this.topNWorkerReportCycle = topNWorkerReportCycle;
}
- @Override
- @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder,
Stream stream,
Class<? extends TopN> topNClass) throws StorageException {