You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/06/30 03:56:43 UTC

[skywalking] branch l1-flush created (now 72bfa01)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch l1-flush
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 72bfa01  Add the period of L1 aggregation flush to L2 aggregation

This branch includes the following new commits:

     new 72bfa01  Add the period of L1 aggregation flush to L2 aggregation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking] 01/01: Add the period of L1 aggregation flush to L2 aggregation

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch l1-flush
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 72bfa0132185d8b787f71541cf1ee4c8b53bae03
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Jun 30 11:56:29 2021 +0800

    Add the period of L1 aggregation flush to L2 aggregation
---
 CHANGES.md                                         |  3 +-
 docs/en/setup/backend/configuration-vocabulary.md  |  1 +
 .../src/main/resources/application.yml             |  2 ++
 .../oap/server/core/CoreModuleConfig.java          |  9 +++++-
 .../oap/server/core/CoreModuleProvider.java        |  3 +-
 .../analysis/worker/MetricsAggregateWorker.java    | 32 +++++++++++++++-------
 .../analysis/worker/MetricsStreamProcessor.java    | 15 ++++++++--
 7 files changed, 49 insertions(+), 16 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8ae7936..eb37dc3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -56,7 +56,8 @@ Release Notes.
 * Performance: cache regex pattern and result, optimize string concatenation in Envy ALS analyzer.
 * Performance: cache metrics id and entity id in `Metrics` and `ISource`.
 * Performance: enhance persistent session mechanism, about differentiating cache timeout for different dimensionality
-  metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min. 
+  metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min.
+* Performance: Add L1 aggregation flush period, which reduce the CPU load and help young GC.
 
 #### UI
 
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index c435c8f..3fa9376 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -23,6 +23,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit is minute. Execution doesn't mean deleting data. The storage provider could override this, such as ElasticSearch storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5|
 | - | - | recordDataTTL|The lifecycle of record data. Record data includes traces, top n sampled records, and logs. Unit is day. Minimal value is 2.|SW_CORE_RECORD_DATA_TTL|3|
 | - | - | metricsDataTTL|The lifecycle of metrics data, including the metadata. Unit is day. Recommend metricsDataTTL >= recordDataTTL. Minimal value is 2.| SW_CORE_METRICS_DATA_TTL|7|
+| - | - | l1FlushPeriod| The period of L1 aggregation flush to L2 aggregation. Unit is ms. | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 |
 | - | - | enableDatabaseSession|Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute.|SW_CORE_ENABLE_DATABASE_SESSION|true|
 | - | - | topNReportPeriod|The execution period of top N sampler, which saves sampled data into the storage. Unit is minute|SW_CORE_TOPN_REPORT_PERIOD|10|
 | - | - | activeExtraModelColumns|Append the names of entity, such as service name, into the metrics storage entities.|SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS|false|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index e6db202..2f888fd 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -85,6 +85,8 @@ core:
     dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How often the data keeper executor runs periodically, unit is minute
     recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:3} # Unit is day
     metricsDataTTL: ${SW_CORE_METRICS_DATA_TTL:7} # Unit is day
+    # The period of L1 aggregation flush to L2 aggregation. Unit is ms.
+    l1FlushPeriod: ${SW_CORE_L1_AGGREGATION_FLUSH_PERIOD:500}
     # Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
     # the metrics may not be accurate within that minute.
     enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 8ffcc56..af5f735 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -46,8 +46,15 @@ public class CoreModuleConfig extends ModuleConfig {
     private String gRPCSslTrustedCAPath;
     private int maxConcurrentCallsPerConnection;
     private int maxMessageSize;
-    private boolean enableDatabaseSession;
     private int topNReportPeriod;
+    /**
+     * The period of L1 aggregation flush. Unit is ms.
+     */
+    private long l1FlushPeriod = 500;
+    /**
+     * Enable database flush session.
+     */
+    private boolean enableDatabaseSession;
     private final List<String> downsampling;
     /**
      * The period of doing data persistence. Unit is second.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9e99bd1..a8bbcbe 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -46,9 +46,9 @@ import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
 import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
 import org.apache.skywalking.oap.server.core.config.NamingControl;
-import org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
 import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
 import org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher;
+import org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
 import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateInitializer;
 import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
 import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine;
@@ -289,6 +289,7 @@ public class CoreModuleProvider extends ModuleProvider {
             UITemplateManagementService.class, new UITemplateManagementService(getManager()));
 
         MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
+        MetricsStreamProcessor.getInstance().setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
         TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
         apdexThresholdConfig = new ApdexThresholdConfig(this);
         ApdexMetrics.setDICT(apdexThresholdConfig);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index b6fd9b3..684c5fc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -42,13 +42,15 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
  */
 @Slf4j
 public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
+    public final long l1FlushPeriod;
     private AbstractWorker<Metrics> nextWorker;
     private final DataCarrier<Metrics> dataCarrier;
     private final MergableBufferedData<Metrics> mergeDataCache;
     private CounterMetrics aggregationCounter;
+    private long lastSendTime = 0;
 
     MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
-                           String modelName) {
+                           String modelName, long l1FlushPeriod) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergableBufferedData();
@@ -69,8 +71,10 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
                                                           .getService(MetricsCreator.class);
         aggregationCounter = metricsCreator.createCounter(
             "metrics_aggregation", "The number of rows in aggregation",
-            new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "minute")
+            new MetricsTag.Keys("metricName", "level", "dimensionality"),
+            new MetricsTag.Values(modelName, "1", "minute")
         );
+        this.l1FlushPeriod = l1FlushPeriod;
     }
 
     /**
@@ -93,14 +97,22 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
             mergeDataCache.accept(metrics);
         });
 
-        mergeDataCache.read().forEach(
-            data -> {
-                if (log.isDebugEnabled()) {
-                    log.debug(data.toString());
+        flush();
+    }
+
+    private void flush() {
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - lastSendTime > l1FlushPeriod) {
+            mergeDataCache.read().forEach(
+                data -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug(data.toString());
+                    }
+                    nextWorker.in(data);
                 }
-                nextWorker.in(data);
-            }
-        );
+            );
+            lastSendTime = currentTime;
+        }
     }
 
     private class AggregatorConsumer implements IConsumer<Metrics> {
@@ -122,4 +134,4 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
         public void onExit() {
         }
     }
-}
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 9d8faf7..3a40777 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -71,6 +71,12 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
     private List<MetricsPersistentWorker> persistentWorkers = new ArrayList<>();
 
     /**
+     * The period of L1 aggregation flush. Unit is ms.
+     */
+    @Setter
+    @Getter
+    private long l1FlushPeriod = 500;
+    /**
      * Hold and forward CoreModuleConfig#enableDatabaseSession to the persistent worker.
      */
     @Setter
@@ -97,7 +103,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
      * @param metricsClass       data type of the streaming calculation.
      */
     @Override
-    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) throws StorageException {
+    public void create(ModuleDefineHolder moduleDefineHolder,
+                       Stream stream,
+                       Class<? extends Metrics> metricsClass) throws StorageException {
         this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
     }
 
@@ -108,7 +116,8 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
         final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
                                                                               .provider()
                                                                               .getService(StorageBuilderFactory.class);
-        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(metricsClass, stream.getBuilder());
+        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(
+            metricsClass, stream.getBuilder());
 
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IMetricsDAO metricsDAO;
@@ -167,7 +176,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
 
         MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
         MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
-            moduleDefineHolder, remoteWorker, stream.getName());
+            moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);
 
         entryWorkers.put(metricsClass, aggregateWorker);
     }