You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/07/14 14:44:49 UTC

[skywalking] 01/01: Reduce the flush period of hour and day level metrics

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch reduce-persistence-period
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 719a359229df6664ad978818d2f4b28c1c0c39e4
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Jul 14 22:44:31 2021 +0800

    Reduce the flush period of hour and day level metrics
---
 CHANGES.md                                             |  6 ++++++
 .../core/analysis/worker/MetricsPersistentWorker.java  | 18 ++++++++++++++++++
 .../oap/server/core/query/MetadataQueryService.java    | 16 ++++++++++------
 3 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index fc932f6..e399545 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -88,8 +88,13 @@ Release Notes.
 * Remove `syncBulkActions` in ElasticSearch storage option.
 * Increase the default bulkActions(env, SW_STORAGE_ES_BULK_ACTIONS) to 5000(from 1000).
 * Increase the flush interval of ElasticSearch indices to 15s(from 10s)
+* Provide distinct for elements of metadata lists. Due to the more aggressive asynchronous flush, metadata lists have
+  more chances including duplicate elements. Don't need this as indicate anymore.
+* Reduce the flush period of hour and day level metrics, only run in 4 times of regular persistent period. This means
+  default flush period of hour and day level metrics are 25s * 4.
 
 #### UI
+
 * Fix the date component for log conditions.
 * Fix selector keys for duplicate options.
 * Add Python celery plugin.
@@ -99,6 +104,7 @@ Release Notes.
 * Fix chart types for setting metrics configure.
 
 #### Documentation
+
 * Add FAQ about `Elasticsearch exception type=version_conflict_engine_exception since 8.7.0`
 
 All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/90?closed=1)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 5dd29db..444ec21 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -67,6 +67,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     private final boolean supportUpdate;
     private long sessionTimeout;
     private CounterMetrics aggregationCounter;
+    /**
+     * The counter for the round of persistent.
+     */
+    private int persistentCounter;
+    /**
+     * The mod value to control persistent. The MetricsPersistentWorker is driven by the {@link
+     * org.apache.skywalking.oap.server.core.storage.PersistenceTimer}. The down sampling level workers only execute in
+     * every {@link #persistentMod} periods. And minute level workers execute every time.
+     */
+    private int persistentMod;
 
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
                             AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
@@ -82,6 +92,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         this.transWorker = Optional.ofNullable(transWorker);
         this.supportUpdate = supportUpdate;
         this.sessionTimeout = storageSessionTimeout;
+        this.persistentCounter = 0;
+        this.persistentMod = 1;
 
         String name = "METRICS_L2_AGGREGATION";
         int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
@@ -122,6 +134,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         // And add offset according to worker creation sequence, to avoid context clear overlap,
         // eventually optimize load of IDs reading.
         this.sessionTimeout = this.sessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200;
+        // The down sampling level worker executes every 4 periods.
+        this.persistentMod = 4;
     }
 
     /**
@@ -135,6 +149,10 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
 
     @Override
     public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
+        if (persistentCounter++ % persistentMod != 0) {
+            return;
+        }
+
         long start = System.currentTimeMillis();
         if (lastCollection.size() == 0) {
             return;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
index 7f6d674..6459208 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
@@ -54,30 +54,34 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
                                         if (service.getGroup() == null) {
                                             service.setGroup(Const.EMPTY_STRING);
                                         }
-                                    }).collect(Collectors.toList());
+                                    })
+                                    .distinct()
+                                    .collect(Collectors.toList());
     }
 
     public List<Service> getAllBrowserServices() throws IOException {
-        return getMetadataQueryDAO().getAllBrowserServices();
+        return getMetadataQueryDAO().getAllBrowserServices().stream().distinct().collect(Collectors.toList());
     }
 
     public List<Database> getAllDatabases() throws IOException {
-        return getMetadataQueryDAO().getAllDatabases();
+        return getMetadataQueryDAO().getAllDatabases().stream().distinct().collect(Collectors.toList());
     }
 
     public List<Service> searchServices(final long startTimestamp, final long endTimestamp,
                                         final String keyword) throws IOException {
-        return getMetadataQueryDAO().searchServices(keyword);
+        return getMetadataQueryDAO().searchServices(keyword).stream().distinct().collect(Collectors.toList());
     }
 
     public List<ServiceInstance> getServiceInstances(final long startTimestamp, final long endTimestamp,
                                                      final String serviceId) throws IOException {
-        return getMetadataQueryDAO().getServiceInstances(startTimestamp, endTimestamp, serviceId);
+        return getMetadataQueryDAO().getServiceInstances(startTimestamp, endTimestamp, serviceId)
+                                    .stream().distinct().collect(Collectors.toList());
     }
 
     public List<Endpoint> searchEndpoint(final String keyword, final String serviceId,
                                          final int limit) throws IOException {
-        return getMetadataQueryDAO().searchEndpoint(keyword, serviceId, limit);
+        return getMetadataQueryDAO().searchEndpoint(keyword, serviceId, limit)
+                                    .stream().distinct().collect(Collectors.toList());
     }
 
     public Service searchService(final String serviceCode) throws IOException {