You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/21 15:41:24 UTC

[skywalking] branch queue created (now 59de662554)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git


      at 59de662554 Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" #8664

This branch includes the following new commits:

     new 59de662554 Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" #8664

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" #8664

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 59de662554b30a9bcd22dd1967d2fc35b8c1e333
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Apr 21 23:41:17 2022 +0800

    Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" #8664
---
 docs/en/changes/changes.md                         |   8 +-
 .../datacarrier/consumer/BulkConsumePool.java      |   4 +-
 .../consumer/MultipleChannelsConsumer.java         | 112 ++++++---------------
 .../src/main/resources/application.yml             |   6 +-
 4 files changed, 40 insertions(+), 90 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 9b8ff5f79e..72b39f6515 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -13,8 +13,8 @@
 * Support BanyanDB global index for entities. Log and Segment record entities declare this new feature.
 * Remove unnecessary analyzer settings in columns of templates. Many were added due to analyzer's default value.
 * Simplify the Kafka Fetch configuration in cluster mode.
-* [Breaking Change] Update the eBPF Profiling task to the service level,
-  please delete index/table: `ebpf_profiling_task`, `process_traffic`.
+* [Breaking Change] Update the eBPF Profiling task to the service level, please delete
+  index/table: `ebpf_profiling_task`, `process_traffic`.
 * Fix event can't split service ID into 2 parts.
 * Fix OAP Self-Observability metric `GC Time` calculation.
 * Set `SW_QUERY_MAX_QUERY_COMPLEXITY` default value to `1000`
@@ -22,7 +22,9 @@
 * [Breaking Change] Add layer field to event, report an event without layer is not allowed.
 * Fix ES flush thread stops when flush schedule task throws exception, such as ElasticSearch flush failed.
 * Fix ES BulkProcessor in BatchProcessEsDAO was initialized multiple times and created multiple ES flush schedule tasks.
-* HTTPServer support the handler register with allowed HTTP methods. 
+* HTTPServer support the handler register with allowed HTTP methods.
+* [Critical] Revert [**Enhance DataCarrier#MultipleChannelsConsumer to add
+  priority**](https://github.com/apache/skywalking/pull/8664) to avoid consuming issues.
 
 #### UI
 
diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
index 4072437782..b881d50531 100644
--- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
+++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
@@ -34,9 +34,9 @@ public class BulkConsumePool implements ConsumerPool {
     private List<MultipleChannelsConsumer> allConsumers;
     private volatile boolean isStarted = false;
 
-    private BulkConsumePool(String name, int size, long consumeCycle) {
+    public BulkConsumePool(String name, int size, long consumeCycle) {
         size = EnvUtil.getInt(name + "_THREAD", size);
-        allConsumers = new ArrayList<>(size);
+        allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
         for (int i = 0; i < size; i++) {
             MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
             multipleChannelsConsumer.setDaemon(true);
diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
index 282f6db0ac..db60f0d098 100644
--- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -48,8 +48,8 @@ public class MultipleChannelsConsumer extends Thread {
         while (running) {
             boolean hasData = false;
             for (Group target : consumeTargets) {
-                boolean consume = target.consume(consumeList);
-                hasData = hasData || consume;
+                boolean consumed = consume(target, consumeList);
+                hasData = hasData || consumed;
             }
 
             if (!hasData) {
@@ -63,8 +63,30 @@ public class MultipleChannelsConsumer extends Thread {
         // consumer thread is going to stop
         // consume the last time
         for (Group target : consumeTargets) {
-            target.consume(consumeList);
+            consume(target, consumeList);
+
+            target.consumer.onExit();
+        }
+    }
+
+    private boolean consume(Group target, List consumeList) {
+        for (int i = 0; i < target.channels.getChannelSize(); i++) {
+            QueueBuffer buffer = target.channels.getBuffer(i);
+            buffer.obtain(consumeList);
+        }
+
+        if (!consumeList.isEmpty()) {
+            try {
+                target.consumer.consume(consumeList);
+            } catch (Throwable t) {
+                target.consumer.onError(consumeList, t);
+            } finally {
+                consumeList.clear();
+            }
+            return true;
         }
+        target.consumer.nothingToConsume();
+        return false;
     }
 
     /**
@@ -74,7 +96,9 @@ public class MultipleChannelsConsumer extends Thread {
         Group group = new Group(channels, consumer);
         // Recreate the new list to avoid change list while the list is used in consuming.
         ArrayList<Group> newList = new ArrayList<Group>();
-        newList.addAll(consumeTargets);
+        for (Group target : consumeTargets) {
+            newList.add(target);
+        }
         newList.add(group);
         consumeTargets = newList;
         size += channels.size();
@@ -91,86 +115,10 @@ public class MultipleChannelsConsumer extends Thread {
     private static class Group {
         private Channels channels;
         private IConsumer consumer;
-        /**
-         * Priority determines the consuming strategy. On default every period consumer thread loops all groups trying
-         * to fetch the data from queue, if the queue only contains few elements, it is too expensive to consume every
-         * time.
-         *
-         * if 'size of last fetched data' > 0
-         *
-         * priority = 'size of last fetched data' * 100 / {@link Channels#size()} * {@link Channels#getChannelSize()}
-         *
-         * else
-         *
-         * priority = priority / 2
-         *
-         * Meaning, priority is the load factor of {@link #channels}
-         *
-         * After consuming loop, priority = (priority of current loop + priority of last loop) / 2.
-         *
-         * If priority > 50, consuming happens in next loop, otherwise, priority += 10, and wait until priority > 50. In
-         * worth case, for a low traffic group, consuming happens in 1/10.
-         *
-         * Priority only exists in {@link MultipleChannelsConsumer}, because it has limited threads but has to consume
-         * from a large set of queues.
-         *
-         * @since 9.0.0
-         */
-        private int priority;
-        private short continuousNoDataCount;
-
-        private Group(Channels channels, IConsumer consumer) {
+
+        public Group(Channels channels, IConsumer consumer) {
             this.channels = channels;
             this.consumer = consumer;
-            this.priority = 0;
-            this.continuousNoDataCount = 0;
-        }
-
-        /**
-         * @return false if there is no data to consume, or priority is too low. Read {@link #priority} for more
-         * details.
-         * @since 9.0.0
-         */
-        private boolean consume(List consumeList) {
-            try {
-                if (priority < 50) {
-                    priority += 10;
-                    return false;
-                }
-
-                for (int i = 0; i < channels.getChannelSize(); i++) {
-                    QueueBuffer buffer = channels.getBuffer(i);
-                    buffer.obtain(consumeList);
-                }
-
-                if (!consumeList.isEmpty()) {
-                    priority = (priority + (int) (consumeList.size() * 100 / channels.getChannelSize() * channels.size())) / 2;
-                    try {
-                        consumer.consume(consumeList);
-                    } catch (Throwable t) {
-                        consumer.onError(consumeList, t);
-                    } finally {
-                        consumeList.clear();
-                    }
-                    continuousNoDataCount = 0;
-                    return true;
-                } else {
-                    if (continuousNoDataCount < 5) {
-                        continuousNoDataCount++;
-                        // For low traffic queue (low traffic means occasionally no data
-                        // cut priority to half to reduce consuming period.
-                        priority /= 2;
-                    } else {
-                        // For cold queue, the consuming happens in 1/10;
-                        priority = -50;
-                    }
-                }
-
-                consumer.nothingToConsume();
-                return false;
-            } finally {
-                consumer.onExit();
-            }
         }
     }
 }
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 1e61ea3c94..1a8229f49c 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -273,7 +273,7 @@ agent-analyzer:
     # Nginx and Envoy agents can't get the real remote address.
     # Exit spans with the component in the list would not generate the client-side instance relation metrics.
     noUpstreamRealAddressAgents: ${SW_NO_UPSTREAM_REAL_ADDRESS:6000,9000}
-    meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool,satellite,spring-sleuth} # Which files could be meter analyzed, files split by ","
+    meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool} # Which files could be meter analyzed, files split by ","
 
 log-analyzer:
   selector: ${SW_LOG_ANALYZER:default}
@@ -330,7 +330,7 @@ receiver-profile:
   default:
 
 receiver-zabbix:
-  selector: ${SW_RECEIVER_ZABBIX:default}
+  selector: ${SW_RECEIVER_ZABBIX:-}
   default:
     port: ${SW_RECEIVER_ZABBIX_PORT:10051}
     host: ${SW_RECEIVER_ZABBIX_HOST:0.0.0.0}
@@ -375,7 +375,7 @@ receiver-meter:
   default:
 
 receiver-otel:
-  selector: ${SW_OTEL_RECEIVER:default}
+  selector: ${SW_OTEL_RECEIVER:-}
   default:
     enabledHandlers: ${SW_OTEL_RECEIVER_ENABLED_HANDLERS:"oc"}
     enabledOcRules: ${SW_OTEL_RECEIVER_ENABLED_OC_RULES:"istio-controlplane,k8s-node,oap,vm"}