You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/22 02:11:55 UTC

[skywalking] branch master updated: Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" (#8924)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 023a2d3165 Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" (#8924)
023a2d3165 is described below

commit 023a2d3165e2cf2217572caf8f8017b8a202e135
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Apr 22 10:11:38 2022 +0800

    Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" (#8924)
---
 docs/en/changes/changes.md                         |   8 +-
 .../datacarrier/consumer/BulkConsumePool.java      |   4 +-
 .../consumer/MultipleChannelsConsumer.java         | 112 ++++++---------------
 .../src/main/resources/application.yml             |   4 +-
 4 files changed, 39 insertions(+), 89 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 9b8ff5f79e..72b39f6515 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -13,8 +13,8 @@
 * Support BanyanDB global index for entities. Log and Segment record entities declare this new feature.
 * Remove unnecessary analyzer settings in columns of templates. Many were added due to analyzer's default value.
 * Simplify the Kafka Fetch configuration in cluster mode.
-* [Breaking Change] Update the eBPF Profiling task to the service level,
-  please delete index/table: `ebpf_profiling_task`, `process_traffic`.
+* [Breaking Change] Update the eBPF Profiling task to the service level, please delete
+  index/table: `ebpf_profiling_task`, `process_traffic`.
 * Fix event can't split service ID into 2 parts.
 * Fix OAP Self-Observability metric `GC Time` calculation.
 * Set `SW_QUERY_MAX_QUERY_COMPLEXITY` default value to `1000`
@@ -22,7 +22,9 @@
 * [Breaking Change] Add layer field to event, report an event without layer is not allowed.
 * Fix ES flush thread stops when flush schedule task throws exception, such as ElasticSearch flush failed.
 * Fix ES BulkProcessor in BatchProcessEsDAO was initialized multiple times and created multiple ES flush schedule tasks.
-* HTTPServer support the handler register with allowed HTTP methods. 
+* HTTPServer support the handler register with allowed HTTP methods.
+* [Critical] Revert [**Enhance DataCarrier#MultipleChannelsConsumer to add
+  priority**](https://github.com/apache/skywalking/pull/8664) to avoid consuming issues.
 
 #### UI
 
diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
index 4072437782..b881d50531 100644
--- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
+++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
@@ -34,9 +34,9 @@ public class BulkConsumePool implements ConsumerPool {
     private List<MultipleChannelsConsumer> allConsumers;
     private volatile boolean isStarted = false;
 
-    private BulkConsumePool(String name, int size, long consumeCycle) {
+    public BulkConsumePool(String name, int size, long consumeCycle) {
         size = EnvUtil.getInt(name + "_THREAD", size);
-        allConsumers = new ArrayList<>(size);
+        allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
         for (int i = 0; i < size; i++) {
             MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
             multipleChannelsConsumer.setDaemon(true);
diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
index 282f6db0ac..db60f0d098 100644
--- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -48,8 +48,8 @@ public class MultipleChannelsConsumer extends Thread {
         while (running) {
             boolean hasData = false;
             for (Group target : consumeTargets) {
-                boolean consume = target.consume(consumeList);
-                hasData = hasData || consume;
+                boolean consumed = consume(target, consumeList);
+                hasData = hasData || consumed;
             }
 
             if (!hasData) {
@@ -63,8 +63,30 @@ public class MultipleChannelsConsumer extends Thread {
         // consumer thread is going to stop
         // consume the last time
         for (Group target : consumeTargets) {
-            target.consume(consumeList);
+            consume(target, consumeList);
+
+            target.consumer.onExit();
+        }
+    }
+
+    private boolean consume(Group target, List consumeList) {
+        for (int i = 0; i < target.channels.getChannelSize(); i++) {
+            QueueBuffer buffer = target.channels.getBuffer(i);
+            buffer.obtain(consumeList);
+        }
+
+        if (!consumeList.isEmpty()) {
+            try {
+                target.consumer.consume(consumeList);
+            } catch (Throwable t) {
+                target.consumer.onError(consumeList, t);
+            } finally {
+                consumeList.clear();
+            }
+            return true;
         }
+        target.consumer.nothingToConsume();
+        return false;
     }
 
     /**
@@ -74,7 +96,9 @@ public class MultipleChannelsConsumer extends Thread {
         Group group = new Group(channels, consumer);
         // Recreate the new list to avoid change list while the list is used in consuming.
         ArrayList<Group> newList = new ArrayList<Group>();
-        newList.addAll(consumeTargets);
+        for (Group target : consumeTargets) {
+            newList.add(target);
+        }
         newList.add(group);
         consumeTargets = newList;
         size += channels.size();
@@ -91,86 +115,10 @@ public class MultipleChannelsConsumer extends Thread {
     private static class Group {
         private Channels channels;
         private IConsumer consumer;
-        /**
-         * Priority determines the consuming strategy. On default every period consumer thread loops all groups trying
-         * to fetch the data from queue, if the queue only contains few elements, it is too expensive to consume every
-         * time.
-         *
-         * if 'size of last fetched data' > 0
-         *
-         * priority = 'size of last fetched data' * 100 / {@link Channels#size()} * {@link Channels#getChannelSize()}
-         *
-         * else
-         *
-         * priority = priority / 2
-         *
-         * Meaning, priority is the load factor of {@link #channels}
-         *
-         * After consuming loop, priority = (priority of current loop + priority of last loop) / 2.
-         *
-         * If priority > 50, consuming happens in next loop, otherwise, priority += 10, and wait until priority > 50. In
-         * worth case, for a low traffic group, consuming happens in 1/10.
-         *
-         * Priority only exists in {@link MultipleChannelsConsumer}, because it has limited threads but has to consume
-         * from a large set of queues.
-         *
-         * @since 9.0.0
-         */
-        private int priority;
-        private short continuousNoDataCount;
-
-        private Group(Channels channels, IConsumer consumer) {
+
+        public Group(Channels channels, IConsumer consumer) {
             this.channels = channels;
             this.consumer = consumer;
-            this.priority = 0;
-            this.continuousNoDataCount = 0;
-        }
-
-        /**
-         * @return false if there is no data to consume, or priority is too low. Read {@link #priority} for more
-         * details.
-         * @since 9.0.0
-         */
-        private boolean consume(List consumeList) {
-            try {
-                if (priority < 50) {
-                    priority += 10;
-                    return false;
-                }
-
-                for (int i = 0; i < channels.getChannelSize(); i++) {
-                    QueueBuffer buffer = channels.getBuffer(i);
-                    buffer.obtain(consumeList);
-                }
-
-                if (!consumeList.isEmpty()) {
-                    priority = (priority + (int) (consumeList.size() * 100 / channels.getChannelSize() * channels.size())) / 2;
-                    try {
-                        consumer.consume(consumeList);
-                    } catch (Throwable t) {
-                        consumer.onError(consumeList, t);
-                    } finally {
-                        consumeList.clear();
-                    }
-                    continuousNoDataCount = 0;
-                    return true;
-                } else {
-                    if (continuousNoDataCount < 5) {
-                        continuousNoDataCount++;
-                        // For low traffic queue (low traffic means occasionally no data
-                        // cut priority to half to reduce consuming period.
-                        priority /= 2;
-                    } else {
-                        // For cold queue, the consuming happens in 1/10;
-                        priority = -50;
-                    }
-                }
-
-                consumer.nothingToConsume();
-                return false;
-            } finally {
-                consumer.onExit();
-            }
         }
     }
 }
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index adaf0fba07..6f9589148e 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -273,7 +273,7 @@ agent-analyzer:
     # Nginx and Envoy agents can't get the real remote address.
     # Exit spans with the component in the list would not generate the client-side instance relation metrics.
     noUpstreamRealAddressAgents: ${SW_NO_UPSTREAM_REAL_ADDRESS:6000,9000}
-    meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool,satellite,spring-sleuth} # Which files could be meter analyzed, files split by ","
+    meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool,satellite} # Which files could be meter analyzed, files split by ","
 
 log-analyzer:
   selector: ${SW_LOG_ANALYZER:default}
@@ -330,7 +330,7 @@ receiver-profile:
   default:
 
 receiver-zabbix:
-  selector: ${SW_RECEIVER_ZABBIX:default}
+  selector: ${SW_RECEIVER_ZABBIX:-}
   default:
     port: ${SW_RECEIVER_ZABBIX_PORT:10051}
     host: ${SW_RECEIVER_ZABBIX_HOST:0.0.0.0}