You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/18 06:55:14 UTC
[incubator-skywalking] branch master updated: Backend streaming
thread model improvement (#2247)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 2f3ccf3 Backend streaming thread model improvement (#2247)
2f3ccf3 is described below
commit 2f3ccf368c6ccd88f2f9fa56ae3de047c3f2d810
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Feb 18 14:55:07 2019 +0800
Backend streaming thread model improvement (#2247)
[Performance Improvement]Backend streaming thread model improvement
---
.mvn/wrapper/MavenWrapperDownloader.java | 0
.mvn/wrapper/maven-wrapper.properties | 0
.../apm/commons/datacarrier/buffer/Buffer.java | 7 +-
.../apm/commons/datacarrier/buffer/Channels.java | 8 ++
.../datacarrier/consumer/BulkConsumePool.java | 125 +++++++++++++++++++++
.../datacarrier/consumer/ConsumerPoolFactory.java | 8 +-
.../consumer/MultipleChannelsConsumer.java | 124 ++++++++++++++++++++
.../datacarrier/consumer/BulkConsumePoolTest.java | 91 +++++++++++++++
docker/config/application.yml | 2 +-
.../service-agent/java-agent/Supported-list.md | 2 +-
.../oap/server/core/CoreModuleConfig.java | 1 +
.../oap/server/core/CoreModuleProvider.java | 2 +-
.../oap/server/core/UnexpectedException.java | 4 +
.../analysis/worker/IndicatorAggregateWorker.java | 10 +-
.../analysis/worker/IndicatorPersistentWorker.java | 17 ++-
.../analysis/worker/RecordPersistentWorker.java | 14 ++-
.../server/core/analysis/worker/TopNWorker.java | 2 +-
.../register/worker/RegisterDistinctWorker.java | 14 ++-
.../register/worker/RegisterPersistentWorker.java | 17 ++-
.../server/library/server/jetty/JettyServer.java | 25 ++++-
.../SegmentStandardizationWorker.java | 4 +-
.../src/main/assembly/application.yml | 2 +-
.../src/main/resources/application.yml | 2 +-
23 files changed, 455 insertions(+), 26 deletions(-)
diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java
old mode 100755
new mode 100644
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
old mode 100755
new mode 100644
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index 4fbb478..e0f17c3 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -18,8 +18,7 @@
package org.apache.skywalking.apm.commons.datacarrier.buffer;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
@@ -80,6 +79,10 @@ public class Buffer<T> {
return buffer.length;
}
+ public LinkedList<T> obtain() {
+ return this.obtain(0, buffer.length);
+ }
+
public LinkedList<T> obtain(int start, int end) {
LinkedList<T> result = new LinkedList<T>();
for (int i = start; i < end; i++) {
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
index 4f7ee87..2a83ba0 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
@@ -81,6 +81,14 @@ public class Channels<T> {
return this.bufferChannels.length;
}
+ public int getBufferSize() {
+ return bufferChannels[0].getBufferSize();
+ }
+
+ public long size() {
+ return (long)getChannelSize() * getBufferSize();
+ }
+
public Buffer<T> getBuffer(int index) {
return this.bufferChannels[index];
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
new file mode 100644
index 0000000..798a601
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+
+/**
+ * BulkConsumePool works for consuming data from multiple channels(DataCarrier instances), with multiple {@link
+ * MultipleChannelsConsumer}s.
+ *
+ * In typical case, the number of {@link MultipleChannelsConsumer} should be less than the number of channels.
+ *
+ * @author wusheng
+ */
+public class BulkConsumePool implements ConsumerPool {
+ private List<MultipleChannelsConsumer> allConsumers;
+ private volatile boolean isStarted = false;
+
+ public BulkConsumePool(String name, int size, long consumeCycle) {
+ allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
+ String threadNum = System.getenv(name + "_THREAD");
+ if (threadNum != null) {
+ try {
+ size = Integer.parseInt(threadNum);
+ } catch (NumberFormatException e) {
+
+ }
+ }
+ for (int i = 0; i < size; i++) {
+ MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
+ multipleChannelsConsumer.setDaemon(true);
+ allConsumers.add(multipleChannelsConsumer);
+ }
+ }
+
+ @Override synchronized public void add(String name, Channels channels, IConsumer consumer) {
+ MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
+ multipleChannelsConsumer.addNewTarget(channels, consumer);
+ }
+
+ /**
+ * Get the lowest payload consumer thread based on current allocate status.
+ *
+ * @return the lowest consumer.
+ */
+ private MultipleChannelsConsumer getLowestPayload() {
+ MultipleChannelsConsumer winner = allConsumers.get(0);
+ for (int i = 1; i < allConsumers.size(); i++) {
+ MultipleChannelsConsumer option = allConsumers.get(i);
+ if (option.size() < winner.size()) {
+ return option;
+ }
+ }
+ return winner;
+ }
+
+ /**
+ * @param channels
+ * @return
+ */
+ @Override public boolean isRunning(Channels channels) {
+ return isStarted;
+ }
+
+ @Override public void close(Channels channels) {
+ for (MultipleChannelsConsumer consumer : allConsumers) {
+ consumer.shutdown();
+ }
+ }
+
+ @Override public void begin(Channels channels) {
+ if (isStarted) {
+ return;
+ }
+ for (MultipleChannelsConsumer consumer : allConsumers) {
+ consumer.start();
+ }
+ isStarted = true;
+ }
+
+ /**
+ * The creator for {@link BulkConsumePool}.
+ */
+ public static class Creator implements Callable<ConsumerPool> {
+ private String name;
+ private int size;
+ private long consumeCycle;
+
+ public Creator(String name, int poolSize, long consumeCycle) {
+ this.name = name;
+ this.size = poolSize;
+ this.consumeCycle = consumeCycle;
+ }
+
+ @Override public ConsumerPool call() {
+ return new BulkConsumePool(name, size, consumeCycle);
+ }
+
+ public static int recommendMaxSize() {
+ int processorNum = Runtime.getRuntime().availableProcessors();
+ if (processorNum > 1) {
+ processorNum -= 1;
+ }
+ return processorNum;
+ }
+ }
+}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
index e95e18f..7f86873 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
@@ -63,9 +63,13 @@ public enum ConsumerPoolFactory {
}
}
+ /**
+ * Always return true.
+ * @param channels
+ * @return
+ */
@Override public boolean isRunning(Channels channels) {
- ConsumeDriver driver = allDrivers.get(channels);
- return driver == null ? false : driver.isRunning(channels);
+ return true;
}
@Override public void close(Channels channels) {
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
new file mode 100644
index 0000000..1679302
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+
+/**
+ * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
+ * IConsumer}s
+ *
+ * @author wusheng
+ */
+public class MultipleChannelsConsumer extends Thread {
+ private volatile boolean running;
+ private volatile ArrayList<Group> consumeTargets;
+ private volatile long size;
+ private final long consumeCycle;
+
+ public MultipleChannelsConsumer(String threadName, long consumeCycle) {
+ super(threadName);
+ this.consumeTargets = new ArrayList<Group>();
+ this.consumeCycle = consumeCycle;
+ }
+
+ @Override
+ public void run() {
+ running = true;
+
+ while (running) {
+ boolean hasData = false;
+ for (Group target : consumeTargets) {
+ hasData = hasData || consume(target);
+ }
+
+ if (!hasData) {
+ try {
+ Thread.sleep(consumeCycle);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ }
+
+ // consumer thread is going to stop
+ // consume the last time
+ for (Group target : consumeTargets) {
+ consume(target);
+
+ target.consumer.onExit();
+ }
+ }
+
+ private boolean consume(Group target) {
+ boolean hasData;
+ LinkedList consumeList = new LinkedList();
+ for (int i = 0; i < target.channels.getChannelSize(); i++) {
+ Buffer buffer = target.channels.getBuffer(i);
+ consumeList.addAll(buffer.obtain());
+ }
+ hasData = consumeList.size() > 0;
+
+ if (consumeList.size() > 0) {
+ try {
+ target.consumer.consume(consumeList);
+ } catch (Throwable t) {
+ target.consumer.onError(consumeList, t);
+ }
+ }
+ return hasData;
+ }
+
+ /**
+ * Add a new target channels.
+ *
+ * @param channels
+ * @param consumer
+ */
+ public void addNewTarget(Channels channels, IConsumer consumer) {
+ Group group = new Group(channels, consumer);
+ // Recreate the new list to avoid change list while the list is used in consuming.
+ ArrayList<Group> newList = new ArrayList<Group>();
+ for (Group target : consumeTargets) {
+ newList.add(target);
+ }
+ newList.add(group);
+ consumeTargets = newList;
+ size += channels.size();
+ }
+
+ public long size() {
+ return size;
+ }
+
+ void shutdown() {
+ running = false;
+ }
+
+ private class Group {
+ private Channels channels;
+ private IConsumer consumer;
+
+ public Group(Channels channels, IConsumer consumer) {
+ this.channels = channels;
+ this.consumer = consumer;
+ }
+ }
+}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
new file mode 100644
index 0000000..9cd9c8c
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
+import org.junit.*;
+
+/**
+ * @author wusheng
+ */
+public class BulkConsumePoolTest {
+ @Test
+ public void testOneThreadPool() throws InterruptedException {
+ BulkConsumePool pool = new BulkConsumePool("testPool", 1, 50);
+ final ArrayList<Object> result1 = new ArrayList();
+ Channels c1 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
+ pool.add("test", c1,
+ new IConsumer() {
+ @Override public void init() {
+
+ }
+
+ @Override public void consume(List data) {
+ for (Object datum : data) {
+ result1.add(datum);
+ }
+ }
+
+ @Override public void onError(List data, Throwable t) {
+
+ }
+
+ @Override public void onExit() {
+
+ }
+ });
+ pool.begin(c1);
+ final ArrayList<Object> result2 = new ArrayList();
+ Channels c2 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
+ pool.add("test2", c2,
+ new IConsumer() {
+ @Override public void init() {
+
+ }
+
+ @Override public void consume(List data) {
+ for (Object datum : data) {
+ result2.add(datum);
+ }
+ }
+
+ @Override public void onError(List data, Throwable t) {
+
+ }
+
+ @Override public void onExit() {
+
+ }
+ });
+ pool.begin(c2);
+ c1.save(new Object());
+ c1.save(new Object());
+ c1.save(new Object());
+ c1.save(new Object());
+ c1.save(new Object());
+ c2.save(new Object());
+ c2.save(new Object());
+ Thread.sleep(2000);
+
+ Assert.assertEquals(5, result1.size());
+ Assert.assertEquals(2, result2.size());
+ }
+}
diff --git a/docker/config/application.yml b/docker/config/application.yml
index f9b645f..d99357a 100644
--- a/docker/config/application.yml
+++ b/docker/config/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
- slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh:
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 144c753..072afe9 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -64,6 +64,6 @@
* [GSON](https://github.com/google/gson) 2.8.x (Optional²)
¹Due to license incompatibilities/restrictions these plugins are hosted and released in 3rd part repository,
- go to [OpenSkywalking java plugin extension repository](https://github.com/OpenSkywalking/java-plugin-extensions) to get these.
+ go to [SkyAPM java plugin extension repository](https://github.com/SkyAPM/java-plugin-extensions) to get these.
²These plugins affect the performance or must be used under some conditions, from experiences. So only released in `/optional-plugins`, copy to `/plugins` in order to make them work.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 3de179a..453253a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -30,6 +30,7 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private String nameSpace;
@Setter private String restHost;
@Setter private int restPort;
+ @Setter private int jettySelectors = 1;
@Setter private String restContextPath;
@Setter private String gRPCHost;
@Setter private int gRPCPort;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 0106fe7..f6db632 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -95,7 +95,7 @@ public class CoreModuleProvider extends ModuleProvider {
}
grpcServer.initialize();
- jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath());
+ jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
jettyServer.initialize();
this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
index f290fd0..f5dc51b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
@@ -25,4 +25,8 @@ public class UnexpectedException extends RuntimeException {
public UnexpectedException(String message) {
super(message);
}
+
+ public UnexpectedException(String message, Exception cause) {
+ super(message, cause);
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index f11d014..148534a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -53,7 +54,14 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
+ String name = "INDICATOR_L1_AGGREGATION";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize(), 20);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 172874a..e90d98a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -55,9 +56,21 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
this.mergeDataCache = new MergeDataCache<>();
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
- this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
+ String name = "INDICATOR_L2_AGGREGATION";
+ int size = BulkConsumePool.Creator.recommendMaxSize() / 4;
+ if (size == 0) {
+ size = 1;
+ }
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 20);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+
+ this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 2000);
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
}
@Override void onWork(Indicator indicator) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 8d291d7..a6b972e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
@@ -45,8 +46,17 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
this.modelName = modelName;
this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO;
+
+ String name = "RECORD_PERSISTENT";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+
this.dataCarrier = new DataCarrier<>(1, 10000);
- this.dataCarrier.consume(new RecordPersistentWorker.PersistentConsumer(this), 1);
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RecordPersistentWorker.PersistentConsumer(this));
}
@Override public void in(Record record) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 5d6304e..8bf5dd6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -48,7 +48,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO;
this.modelName = modelName;
- this.dataCarrier = new DataCarrier<>(1, 10000);
+ this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
this.lastReportTimestamp = System.currentTimeMillis();
// Top N persistent only works per 10 minutes.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index 443eb73..ae4bc67 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -42,8 +43,15 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
super(workerId);
this.nextWorker = nextWorker;
this.sources = new HashMap<>();
- this.dataCarrier = new DataCarrier<>(1, 10000);
- this.dataCarrier.consume(new AggregatorConsumer(this), 1, 200);
+ this.dataCarrier = new DataCarrier<>(1, 1000);
+ String name = "REGISTER_L1";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
}
@Override public final void in(RegisterSource source) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 35a83b4..1ccd21d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
@@ -52,8 +52,17 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
this.registerDAO = registerDAO;
this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
this.scope = scope;
- this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1, 200);
+ this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
+
+ String name = "REGISTER_L2";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RegisterPersistentWorker.PersistentConsumer(this));
}
@Override public final void in(RegisterSource registerSource) {
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
index 84e24d4..97f4c85 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
@@ -18,10 +18,12 @@
package org.apache.skywalking.oap.server.library.server.jetty;
-import java.net.InetSocketAddress;
import java.util.Objects;
+import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.*;
+import org.eclipse.jetty.server.*;
import org.eclipse.jetty.servlet.*;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.*;
/**
@@ -34,13 +36,19 @@ public class JettyServer implements Server {
private final String host;
private final int port;
private final String contextPath;
+ private final int selectorNum;
private org.eclipse.jetty.server.Server server;
private ServletContextHandler servletContextHandler;
public JettyServer(String host, int port, String contextPath) {
+ this(host, port, contextPath, -1);
+ }
+
+ public JettyServer(String host, int port, String contextPath, int selectorNum) {
this.host = host;
this.port = port;
this.contextPath = contextPath;
+ this.selectorNum = selectorNum;
}
@Override
@@ -55,7 +63,20 @@ public class JettyServer implements Server {
@Override
public void initialize() {
- server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
+ QueuedThreadPool threadPool = new QueuedThreadPool();
+ if (selectorNum > 0) {
+ threadPool.setMaxThreads(selectorNum * 2 + 2);
+ }
+
+ server = new org.eclipse.jetty.server.Server(threadPool);
+
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ ServerConnector http = new ServerConnector(server, null, null, null,
+ 1, selectorNum, new HttpConnectionFactory(httpConfig));
+ http.setPort(port);
+ http.setHost(host);
+
+ server.addConnector(http);
servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index 47b4396..46a9117 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -56,8 +56,8 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize();
- dataCarrier = new DataCarrier<>(1, 1024);
- dataCarrier.consume(new Consumer(stream), 1);
+ dataCarrier = new DataCarrier<>("SegmentStandardizationWorker", 1, 1024);
+ dataCarrier.consume(new Consumer(stream), 1, 200);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
String metricNamePrefix = isV6 ? "v6_" : "v5_";
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 840777d..b8c0efc 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
- slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
#service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 28c7b97..24df981 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
- slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh: