You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/03/29 08:55:51 UTC

[GitHub] [skywalking] kezhenxu94 commented on a change in pull request #6642: make sync metrics concurrency

kezhenxu94 commented on a change in pull request #6642:
URL: https://github.com/apache/skywalking/pull/6642#discussion_r603113999



##########
File path: docs/en/setup/backend/configuration-vocabulary.md
##########
@@ -39,6 +39,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | maxSizeOfNetworkAddressAlias|Max size of network address detected in the be monitored system.| - | 1_000_000|
 | - | - | maxPageSizeOfQueryProfileSnapshot|The max size in every OAP query for snapshot analysis| - | 500 |
 | - | - | maxSizeOfAnalyzeProfileSnapshot|The max number of snapshots analyzed by OAP| - | 12000 |
+| - | - | syncThreads|The number of threads used to synchronously refresh the metrics data to the storage. When the value is negative, the number of processors will used as the default value.| SW_CORE_SYNC_THREADS | -1 |

Review comment:
       ```suggestion
   | - | - | syncThreads|The number of threads used to synchronously refresh the metrics data to the storage. When the value is zero or negative, the number of processors will used as the default value.| SW_CORE_SYNC_THREADS | -1 |
   ```

##########
File path: oap-server/server-bootstrap/src/main/resources/application.yml
##########
@@ -102,6 +102,10 @@ core:
     searchableTracesTags: ${SW_SEARCHABLE_TAG_KEYS:http.method,status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker}
     # Define the set of log tag keys, which should be searchable through the GraphQL.
     searchableLogsTags: ${SW_SEARCHABLE_LOGS_TAG_KEYS:level}
+    # The number of threads used to synchronously refresh the metrics data to the storage. When the value is negative, the number of processors will used as the default value.

Review comment:
       ```suggestion
       # The number of threads used to synchronously refresh the metrics data to the storage. When the value is zero or negative, the number of processors will used as the default value.
   ```

##########
File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
##########
@@ -134,6 +134,24 @@
     @Getter
     private String searchableLogsTags = "";
 
+    /**
+     * The number of threads used to synchronously refresh the metrics data to the storage. When the value is negative,
+     * the number of processors will used as the default value.
+     *
+     * @since 8.5.0
+     */
+    @Getter
+    @Setter
+    private int syncThreads = 10;

Review comment:
       If the user doesn't specify this, for example, use an old `application.yaml` file (without this key), the default thread count is 10, should it be processor count here too?

##########
File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
##########
@@ -71,7 +77,11 @@ public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
             "persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
-
+        syncOperationThreadsNum = moduleConfig.getSyncThreads() > 0
+            ? moduleConfig.getSyncThreads()
+            : Runtime.getRuntime().availableProcessors();

Review comment:
       I'd say this is better to be located in the `moduleConfig.getSyncThreads()`,
   
   
   

##########
File path: CHANGES.md
##########
@@ -48,20 +48,21 @@ Release Notes.
 * Upgrade the Zipkin Elasticsearch storage from 6 to 7.
 * Require Zipkin receiver must work with `zipkin-elasticsearch7` storage option.
 * Fix `DatabaseSlowStatementBuilder` statement maybe null.
-* Remove fields of parent entity in the relation sources. 
+* Remove fields of parent entity in the relation sources.
 * Save Envoy http access logs when error occurs.
 * Fix wrong `service_instance_sla` setting in the `topology-instance.yml`.
 * Fix wrong metrics name setting in the `self-observability.yml`.
 * Add telemetry data about metrics in, metrics scraping, mesh error and trace in metrics to zipkin receiver.
 * Fix tags store of log and trace on h2/mysql/pg storage.
-* Merge indices by Metrics Function and Meter Function in Elasticsearch Storage. 
+* Merge indices by Metrics Function and Meter Function in Elasticsearch Storage.
 * Fix receiver don't need to get itself when healthCheck
 * Remove group concept from AvgHistogramFunction. Heatmap(function result) doesn't support labels.
 * Support metrics grouped by scope labelValue in MAL, no need global same labelValue as before.
 * Add functions in MAL to filter metrics according to the metric value.
 * Optimize the self monitoring grafana dashboard.
 * Enhance the export service.
 * Add function `retagByK8sMeta` and opt type `K8sRetagType.Pod2Service` in MAL for k8s to relate pods and services.
+* Make the flushing metrics operation concurrency.

Review comment:
       ```suggestion
   * Make the flushing metrics operation concurrent.
   ```

##########
File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
##########
@@ -134,6 +134,24 @@
     @Getter
     private String searchableLogsTags = "";
 
+    /**
+     * The number of threads used to synchronously refresh the metrics data to the storage. When the value is negative,

Review comment:
       ```suggestion
        * The number of threads used to synchronously refresh the metrics data to the storage. When the value is zero or negative,
   ```

##########
File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
##########
@@ -118,9 +128,17 @@ private void extractDataAndSave(IBatchDAO batchDAO) {
 
             HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
             try {
-                if (CollectionUtils.isNotEmpty(prepareRequests)) {
-                    batchDAO.synchronous(prepareRequests);
+                List<List<PrepareRequest>> partitions = Lists.partition(prepareRequests, maxSyncoperationNum);
+                CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
+                for (final List<PrepareRequest> partition : partitions) {
+                    executorService.submit(() -> {
+                        if (CollectionUtils.isNotEmpty(partition)) {
+                            batchDAO.synchronous(partition);
+                        }

Review comment:
       Let's enclose this inside a `try-catch` block, in case the `latch` is never counted down when exception occurs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org