You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/22 02:11:55 UTC
[skywalking] branch master updated: Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" (#8924)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 023a2d3165 Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" (#8924)
023a2d3165 is described below
commit 023a2d3165e2cf2217572caf8f8017b8a202e135
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Apr 22 10:11:38 2022 +0800
Revert "Enhance DataCarrier#MultipleChannelsConsumer to add priority" (#8924)
---
docs/en/changes/changes.md | 8 +-
.../datacarrier/consumer/BulkConsumePool.java | 4 +-
.../consumer/MultipleChannelsConsumer.java | 112 ++++++---------------
.../src/main/resources/application.yml | 4 +-
4 files changed, 39 insertions(+), 89 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 9b8ff5f79e..72b39f6515 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -13,8 +13,8 @@
* Support BanyanDB global index for entities. Log and Segment record entities declare this new feature.
* Remove unnecessary analyzer settings in columns of templates. Many were added due to analyzer's default value.
* Simplify the Kafka Fetch configuration in cluster mode.
-* [Breaking Change] Update the eBPF Profiling task to the service level,
- please delete index/table: `ebpf_profiling_task`, `process_traffic`.
+* [Breaking Change] Update the eBPF Profiling task to the service level, please delete
+ index/table: `ebpf_profiling_task`, `process_traffic`.
* Fix event can't split service ID into 2 parts.
* Fix OAP Self-Observability metric `GC Time` calculation.
* Set `SW_QUERY_MAX_QUERY_COMPLEXITY` default value to `1000`
@@ -22,7 +22,9 @@
* [Breaking Change] Add layer field to event, report an event without layer is not allowed.
* Fix ES flush thread stops when flush schedule task throws exception, such as ElasticSearch flush failed.
* Fix ES BulkProcessor in BatchProcessEsDAO was initialized multiple times and created multiple ES flush schedule tasks.
-* HTTPServer support the handler register with allowed HTTP methods.
+* HTTPServer support the handler register with allowed HTTP methods.
+* [Critical] Revert [**Enhance DataCarrier#MultipleChannelsConsumer to add
+ priority**](https://github.com/apache/skywalking/pull/8664) to avoid consuming issues.
#### UI
diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
index 4072437782..b881d50531 100644
--- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
+++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java
@@ -34,9 +34,9 @@ public class BulkConsumePool implements ConsumerPool {
private List<MultipleChannelsConsumer> allConsumers;
private volatile boolean isStarted = false;
- private BulkConsumePool(String name, int size, long consumeCycle) {
+ public BulkConsumePool(String name, int size, long consumeCycle) {
size = EnvUtil.getInt(name + "_THREAD", size);
- allConsumers = new ArrayList<>(size);
+ allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
for (int i = 0; i < size; i++) {
MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
multipleChannelsConsumer.setDaemon(true);
diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
index 282f6db0ac..db60f0d098 100644
--- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -48,8 +48,8 @@ public class MultipleChannelsConsumer extends Thread {
while (running) {
boolean hasData = false;
for (Group target : consumeTargets) {
- boolean consume = target.consume(consumeList);
- hasData = hasData || consume;
+ boolean consumed = consume(target, consumeList);
+ hasData = hasData || consumed;
}
if (!hasData) {
@@ -63,8 +63,30 @@ public class MultipleChannelsConsumer extends Thread {
// consumer thread is going to stop
// consume the last time
for (Group target : consumeTargets) {
- target.consume(consumeList);
+ consume(target, consumeList);
+
+ target.consumer.onExit();
+ }
+ }
+
+ private boolean consume(Group target, List consumeList) {
+ for (int i = 0; i < target.channels.getChannelSize(); i++) {
+ QueueBuffer buffer = target.channels.getBuffer(i);
+ buffer.obtain(consumeList);
+ }
+
+ if (!consumeList.isEmpty()) {
+ try {
+ target.consumer.consume(consumeList);
+ } catch (Throwable t) {
+ target.consumer.onError(consumeList, t);
+ } finally {
+ consumeList.clear();
+ }
+ return true;
}
+ target.consumer.nothingToConsume();
+ return false;
}
/**
@@ -74,7 +96,9 @@ public class MultipleChannelsConsumer extends Thread {
Group group = new Group(channels, consumer);
// Recreate the new list to avoid change list while the list is used in consuming.
ArrayList<Group> newList = new ArrayList<Group>();
- newList.addAll(consumeTargets);
+ for (Group target : consumeTargets) {
+ newList.add(target);
+ }
newList.add(group);
consumeTargets = newList;
size += channels.size();
@@ -91,86 +115,10 @@ public class MultipleChannelsConsumer extends Thread {
private static class Group {
private Channels channels;
private IConsumer consumer;
- /**
- * Priority determines the consuming strategy. On default every period consumer thread loops all groups trying
- * to fetch the data from queue, if the queue only contains few elements, it is too expensive to consume every
- * time.
- *
- * if 'size of last fetched data' > 0
- *
- * priority = 'size of last fetched data' * 100 / {@link Channels#size()} * {@link Channels#getChannelSize()}
- *
- * else
- *
- * priority = priority / 2
- *
- * Meaning, priority is the load factor of {@link #channels}
- *
- * After consuming loop, priority = (priority of current loop + priority of last loop) / 2.
- *
- * If priority > 50, consuming happens in next loop, otherwise, priority += 10, and wait until priority > 50. In
- * worth case, for a low traffic group, consuming happens in 1/10.
- *
- * Priority only exists in {@link MultipleChannelsConsumer}, because it has limited threads but has to consume
- * from a large set of queues.
- *
- * @since 9.0.0
- */
- private int priority;
- private short continuousNoDataCount;
-
- private Group(Channels channels, IConsumer consumer) {
+
+ public Group(Channels channels, IConsumer consumer) {
this.channels = channels;
this.consumer = consumer;
- this.priority = 0;
- this.continuousNoDataCount = 0;
- }
-
- /**
- * @return false if there is no data to consume, or priority is too low. Read {@link #priority} for more
- * details.
- * @since 9.0.0
- */
- private boolean consume(List consumeList) {
- try {
- if (priority < 50) {
- priority += 10;
- return false;
- }
-
- for (int i = 0; i < channels.getChannelSize(); i++) {
- QueueBuffer buffer = channels.getBuffer(i);
- buffer.obtain(consumeList);
- }
-
- if (!consumeList.isEmpty()) {
- priority = (priority + (int) (consumeList.size() * 100 / channels.getChannelSize() * channels.size())) / 2;
- try {
- consumer.consume(consumeList);
- } catch (Throwable t) {
- consumer.onError(consumeList, t);
- } finally {
- consumeList.clear();
- }
- continuousNoDataCount = 0;
- return true;
- } else {
- if (continuousNoDataCount < 5) {
- continuousNoDataCount++;
- // For low traffic queue (low traffic means occasionally no data
- // cut priority to half to reduce consuming period.
- priority /= 2;
- } else {
- // For cold queue, the consuming happens in 1/10;
- priority = -50;
- }
- }
-
- consumer.nothingToConsume();
- return false;
- } finally {
- consumer.onExit();
- }
}
}
}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index adaf0fba07..6f9589148e 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -273,7 +273,7 @@ agent-analyzer:
# Nginx and Envoy agents can't get the real remote address.
# Exit spans with the component in the list would not generate the client-side instance relation metrics.
noUpstreamRealAddressAgents: ${SW_NO_UPSTREAM_REAL_ADDRESS:6000,9000}
- meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool,satellite,spring-sleuth} # Which files could be meter analyzed, files split by ","
+ meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool,satellite} # Which files could be meter analyzed, files split by ","
log-analyzer:
selector: ${SW_LOG_ANALYZER:default}
@@ -330,7 +330,7 @@ receiver-profile:
default:
receiver-zabbix:
- selector: ${SW_RECEIVER_ZABBIX:default}
+ selector: ${SW_RECEIVER_ZABBIX:-}
default:
port: ${SW_RECEIVER_ZABBIX_PORT:10051}
host: ${SW_RECEIVER_ZABBIX_HOST:0.0.0.0}