You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/02 08:57:04 UTC

[skywalking] 01/01: Reduce the buffer size(queue) of MAL(only) metric streams

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit f83f05ec5707e922fce5fb56b18727391621ef50
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Nov 2 16:56:45 2022 +0800

    Reduce the buffer size(queue) of MAL(only) metric streams
---
 docs/en/changes/changes.md                         |  1 +
 .../oap/server/core/analysis/StreamProcessor.java  |  5 ---
 .../analysis/worker/ManagementStreamProcessor.java |  1 -
 .../core/analysis/worker/MetricStreamKind.java     | 41 ++++++++++++++++++++++
 .../analysis/worker/MetricsAggregateWorker.java    | 19 ++++++++--
 .../analysis/worker/MetricsPersistentWorker.java   | 16 ++++++---
 .../analysis/worker/MetricsStreamProcessor.java    | 39 ++++++++++++--------
 .../core/analysis/worker/NoneStreamProcessor.java  |  1 -
 .../analysis/worker/RecordStreamProcessor.java     |  2 --
 .../core/analysis/worker/TopNStreamProcessor.java  |  2 --
 10 files changed, 95 insertions(+), 32 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index fa9fae0400..6c024b1da2 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -82,6 +82,7 @@
 * Support Python runtime metrics analysis.
 * Support `sampledTrace` in LAL.
 * Support multiple rules with different names under the same layer of LAL script.
+* (Optimization) Reduce the buffer size(queue) of MAL(only) metric streams. Set L1 queue size as 1/20, L2 queue size as 1/2.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java
index 5a57f4c3e5..c7b6aec98d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java
@@ -18,12 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis;
 
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-
 public interface StreamProcessor<STREAM> {
 
     void in(STREAM stream);
-
-    void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends STREAM> streamClass) throws StorageException;
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
index fb5ceec557..f48157a9c2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
@@ -60,7 +60,6 @@ public class ManagementStreamProcessor implements StreamProcessor<ManagementData
         }
     }
 
-    @Override
     public void create(final ModuleDefineHolder moduleDefineHolder, final Stream stream, final Class<? extends ManagementData> streamClass) throws StorageException {
         final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
                                                                               .provider()
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricStreamKind.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricStreamKind.java
new file mode 100644
index 0000000000..b9697b4e9f
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricStreamKind.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+/**
+ * MetricStreamKind represents the kind of metric character.
+ */
+public enum MetricStreamKind {
+    /**
+     * Metric built by {@link org.apache.skywalking.oap.server.core.oal.rt.OALEngine}
+     *
+     * OAL is SkyWalking native metrics from SkyWalking native analyzers, for traces and service mesh logs.
+     * The {@link org.apache.skywalking.oap.server.core.source.Source} implementations represent the raw traffic.
+     *
+     * The significant different between OAL and {@link #MAL} type is, the traffic load of OAL metrics is much more.
+     * So,in the stream process, kernel assigned larger buffer and more resources for this kind.
+     */
+    OAL,
+    /**
+     * Metric built by {@link org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem}
+     *
+     * MAL metrics are from existing metric system, such as SkyWalking meter, Prometheus, OpenTelemetry
+     */
+    MAL
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 74597a24ba..046029cff4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -50,13 +50,26 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
     private CounterMetrics aggregationCounter;
     private long lastSendTime = 0;
 
-    MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
-                           String modelName, long l1FlushPeriod) {
+    MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder,
+                           AbstractWorker<Metrics> nextWorker,
+                           String modelName,
+                           long l1FlushPeriod,
+                           MetricStreamKind kind) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergableBufferedData();
         String name = "METRICS_L1_AGGREGATION";
-        this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
+        int queueChannelSize = 2;
+        int queueBufferSize = 10_000;
+        if (MetricStreamKind.MAL == kind) {
+            // In MAL meter streaming, the load of data flow is much less as they are statistics already,
+            // but in OAL sources, they are raw data.
+            // Set the buffer(size of queue) as 1/20 to reduce unnecessary resource costs.
+            queueChannelSize = 1;
+            queueBufferSize = 1_000;
+        }
+        this.dataCarrier = new DataCarrier<>(
+            "MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize);
 
         BulkConsumePool.Creator creator = new BulkConsumePool.Creator(
             name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index ccab0aba0f..4725f749e2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -86,7 +86,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
                             AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
                             MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate,
-                            long storageSessionTimeout, int metricsDataTTL) {
+                            long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
         super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
         this.model = model;
         this.context = new HashMap<>(100);
@@ -113,7 +113,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
             throw new UnexpectedException(e.getMessage(), e);
         }
 
-        this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000);
+        int bufferSize = 2000;
+        if (MetricStreamKind.MAL == kind) {
+            // In MAL meter streaming, the load of data flow is much less as they are statistics already,
+            // but in OAL sources, they are raw data.
+            // Set the buffer(size of queue) as 1/2 to reduce unnecessary resource costs.
+            bufferSize = 1000;
+        }
+        this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, bufferSize);
         this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
 
         MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
@@ -136,10 +143,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
                             boolean enableDatabaseSession,
                             boolean supportUpdate,
                             long storageSessionTimeout,
-                            int metricsDataTTL) {
+                            int metricsDataTTL,
+                            MetricStreamKind kind) {
         this(moduleDefineHolder, model, metricsDAO,
              null, null, null,
-             enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
+             enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
         );
         // For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes.
         // And add offset according to worker creation sequence, to avoid context clear overlap,
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 7885c0738e..7b849f88b1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -106,23 +106,31 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
     }
 
     /**
-     * Create the workers and work flow for every metrics.
+     * Create the workers and work flow for OAL metrics.
      *
      * @param moduleDefineHolder pointer of the module define.
      * @param stream             definition of the metrics class.
      * @param metricsClass       data type of the streaming calculation.
      */
-    @Override
     public void create(ModuleDefineHolder moduleDefineHolder,
                        Stream stream,
                        Class<? extends Metrics> metricsClass) throws StorageException {
-        this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
+        this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass, MetricStreamKind.OAL);
     }
 
-    @SuppressWarnings("unchecked")
+    /**
+     * Create the workers and work flow for MAL meter
+     */
     public void create(ModuleDefineHolder moduleDefineHolder,
                        StreamDefinition stream,
-                       Class<? extends Metrics> metricsClass) throws StorageException {
+                       Class<? extends Metrics> meterClass) throws StorageException {
+        this.create(moduleDefineHolder, stream, meterClass, MetricStreamKind.MAL);
+    }
+
+    private void create(ModuleDefineHolder moduleDefineHolder,
+                        StreamDefinition stream,
+                        Class<? extends Metrics> metricsClass,
+                        MetricStreamKind kind) throws StorageException {
         final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
                                                                               .provider()
                                                                               .getService(StorageBuilderFactory.class);
@@ -133,7 +141,8 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
         IMetricsDAO metricsDAO;
         try {
             metricsDAO = storageDAO.newMetricsDao(builder.getDeclaredConstructor().newInstance());
-        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException |
+                 InvocationTargetException e) {
             throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
         }
 
@@ -165,14 +174,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
                     metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
                     false
                 );
-                hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
+                hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate, kind);
             }
             if (configService.shouldToDay()) {
                 Model model = modelSetter.add(
                     metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day),
                     false
                 );
-                dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
+                dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate, kind);
             }
 
             transWorker = new MetricsTransWorker(
@@ -184,7 +193,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
             false
         );
         MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
-            moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
+            moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate, kind);
 
         String remoteReceiverWorkerName = stream.getName() + "_rec";
         IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
@@ -194,7 +203,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
 
         MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
         MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
-            moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);
+            moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, kind);
 
         entryWorkers.put(metricsClass, aggregateWorker);
     }
@@ -203,13 +212,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
                                                            IMetricsDAO metricsDAO,
                                                            Model model,
                                                            MetricsTransWorker transWorker,
-                                                           boolean supportUpdate) {
+                                                           boolean supportUpdate,
+                                                           MetricStreamKind kind) {
         AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
         ExportMetricsWorker exportWorker = new ExportMetricsWorker(moduleDefineHolder);
 
         MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
             moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker,
-            enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
+            enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
         );
         persistentWorkers.add(minutePersistentWorker);
 
@@ -219,10 +229,11 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
     private MetricsPersistentWorker downSamplingWorker(ModuleDefineHolder moduleDefineHolder,
                                                        IMetricsDAO metricsDAO,
                                                        Model model,
-                                                       boolean supportUpdate) {
+                                                       boolean supportUpdate,
+                                                       MetricStreamKind kind) {
         MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
             moduleDefineHolder, model, metricsDAO,
-            enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
+            enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
         );
         persistentWorkers.add(persistentWorker);
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
index 7b53ad2ee9..ffb31ded42 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
@@ -60,7 +60,6 @@ public class NoneStreamProcessor implements StreamProcessor<NoneStream> {
         }
     }
 
-    @Override
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends NoneStream> streamClass) throws StorageException {
         final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
                                                                               .provider()
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 893152794a..3d081a8bb5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -56,8 +56,6 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
         }
     }
 
-    @Override
-    @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
         final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
                                                                               .provider()
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 4ba5496114..dc6efcd5b2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -71,8 +71,6 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
         this.topNWorkerReportCycle = topNWorkerReportCycle;
     }
 
-    @Override
-    @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder,
                        Stream stream,
                        Class<? extends TopN> topNClass) throws StorageException {