You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/18 06:55:14 UTC

[incubator-skywalking] branch master updated: Backend streaming thread model improvement (#2247)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f3ccf3  Backend streaming thread model improvement (#2247)
2f3ccf3 is described below

commit 2f3ccf368c6ccd88f2f9fa56ae3de047c3f2d810
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Feb 18 14:55:07 2019 +0800

    Backend streaming thread model improvement (#2247)
    
    [Performance Improvement]Backend streaming thread model improvement
---
 .mvn/wrapper/MavenWrapperDownloader.java           |   0
 .mvn/wrapper/maven-wrapper.properties              |   0
 .../apm/commons/datacarrier/buffer/Buffer.java     |   7 +-
 .../apm/commons/datacarrier/buffer/Channels.java   |   8 ++
 .../datacarrier/consumer/BulkConsumePool.java      | 125 +++++++++++++++++++++
 .../datacarrier/consumer/ConsumerPoolFactory.java  |   8 +-
 .../consumer/MultipleChannelsConsumer.java         | 124 ++++++++++++++++++++
 .../datacarrier/consumer/BulkConsumePoolTest.java  |  91 +++++++++++++++
 docker/config/application.yml                      |   2 +-
 .../service-agent/java-agent/Supported-list.md     |   2 +-
 .../oap/server/core/CoreModuleConfig.java          |   1 +
 .../oap/server/core/CoreModuleProvider.java        |   2 +-
 .../oap/server/core/UnexpectedException.java       |   4 +
 .../analysis/worker/IndicatorAggregateWorker.java  |  10 +-
 .../analysis/worker/IndicatorPersistentWorker.java |  17 ++-
 .../analysis/worker/RecordPersistentWorker.java    |  14 ++-
 .../server/core/analysis/worker/TopNWorker.java    |   2 +-
 .../register/worker/RegisterDistinctWorker.java    |  14 ++-
 .../register/worker/RegisterPersistentWorker.java  |  17 ++-
 .../server/library/server/jetty/JettyServer.java   |  25 ++++-
 .../SegmentStandardizationWorker.java              |   4 +-
 .../src/main/assembly/application.yml              |   2 +-
 .../src/main/resources/application.yml             |   2 +-
 23 files changed, 455 insertions(+), 26 deletions(-)

diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java
old mode 100755
new mode 100644
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
old mode 100755
new mode 100644
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index 4fbb478..e0f17c3 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -18,8 +18,7 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.buffer;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
 import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
 
@@ -80,6 +79,10 @@ public class Buffer<T> {
         return buffer.length;
     }
 
+    public LinkedList<T> obtain() {
+        return this.obtain(0, buffer.length);
+    }
+
     public LinkedList<T> obtain(int start, int end) {
         LinkedList<T> result = new LinkedList<T>();
         for (int i = start; i < end; i++) {
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
index 4f7ee87..2a83ba0 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
@@ -81,6 +81,14 @@ public class Channels<T> {
         return this.bufferChannels.length;
     }
 
+    public int getBufferSize() {
+        return bufferChannels[0].getBufferSize();
+    }
+
+    public long size() {
+        return (long)getChannelSize() * getBufferSize();
+    }
+
     public Buffer<T> getBuffer(int index) {
         return this.bufferChannels[index];
     }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
new file mode 100644
index 0000000..798a601
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+
+/**
+ * BulkConsumePool works for consuming data from multiple channels(DataCarrier instances), with multiple {@link
+ * MultipleChannelsConsumer}s.
+ *
+ * In typical case, the number of {@link MultipleChannelsConsumer} should be less than the number of channels.
+ *
+ * @author wusheng
+ */
+public class BulkConsumePool implements ConsumerPool {
+    private List<MultipleChannelsConsumer> allConsumers;
+    private volatile boolean isStarted = false;
+
+    public BulkConsumePool(String name, int size, long consumeCycle) {
+        allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
+        String threadNum = System.getenv(name + "_THREAD");
+        if (threadNum != null) {
+            try {
+                size = Integer.parseInt(threadNum);
+            } catch (NumberFormatException e) {
+
+            }
+        }
+        for (int i = 0; i < size; i++) {
+            MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
+            multipleChannelsConsumer.setDaemon(true);
+            allConsumers.add(multipleChannelsConsumer);
+        }
+    }
+
+    @Override synchronized public void add(String name, Channels channels, IConsumer consumer) {
+        MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
+        multipleChannelsConsumer.addNewTarget(channels, consumer);
+    }
+
+    /**
+     * Get the lowest payload consumer thread based on current allocate status.
+     *
+     * @return the lowest consumer.
+     */
+    private MultipleChannelsConsumer getLowestPayload() {
+        MultipleChannelsConsumer winner = allConsumers.get(0);
+        for (int i = 1; i < allConsumers.size(); i++) {
+            MultipleChannelsConsumer option = allConsumers.get(i);
+            if (option.size() < winner.size()) {
+                return option;
+            }
+        }
+        return winner;
+    }
+
+    /**
+     * @param channels
+     * @return
+     */
+    @Override public boolean isRunning(Channels channels) {
+        return isStarted;
+    }
+
+    @Override public void close(Channels channels) {
+        for (MultipleChannelsConsumer consumer : allConsumers) {
+            consumer.shutdown();
+        }
+    }
+
+    @Override public void begin(Channels channels) {
+        if (isStarted) {
+            return;
+        }
+        for (MultipleChannelsConsumer consumer : allConsumers) {
+            consumer.start();
+        }
+        isStarted = true;
+    }
+
+    /**
+     * The creator for {@link BulkConsumePool}.
+     */
+    public static class Creator implements Callable<ConsumerPool> {
+        private String name;
+        private int size;
+        private long consumeCycle;
+
+        public Creator(String name, int poolSize, long consumeCycle) {
+            this.name = name;
+            this.size = poolSize;
+            this.consumeCycle = consumeCycle;
+        }
+
+        @Override public ConsumerPool call() {
+            return new BulkConsumePool(name, size, consumeCycle);
+        }
+
+        public static int recommendMaxSize() {
+            int processorNum = Runtime.getRuntime().availableProcessors();
+            if (processorNum > 1) {
+                processorNum -= 1;
+            }
+            return processorNum;
+        }
+    }
+}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
index e95e18f..7f86873 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
@@ -63,9 +63,13 @@ public enum ConsumerPoolFactory {
             }
         }
 
+        /**
+         * Always return true.
+         * @param channels
+         * @return
+         */
         @Override public boolean isRunning(Channels channels) {
-            ConsumeDriver driver = allDrivers.get(channels);
-            return driver == null ? false : driver.isRunning(channels);
+            return true;
         }
 
         @Override public void close(Channels channels) {
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
new file mode 100644
index 0000000..1679302
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+
+/**
+ * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
+ * IConsumer}s
+ *
+ * @author wusheng
+ */
+public class MultipleChannelsConsumer extends Thread {
+    private volatile boolean running;
+    private volatile ArrayList<Group> consumeTargets;
+    private volatile long size;
+    private final long consumeCycle;
+
+    public MultipleChannelsConsumer(String threadName, long consumeCycle) {
+        super(threadName);
+        this.consumeTargets = new ArrayList<Group>();
+        this.consumeCycle = consumeCycle;
+    }
+
+    @Override
+    public void run() {
+        running = true;
+
+        while (running) {
+            boolean hasData = false;
+            for (Group target : consumeTargets) {
+                hasData = hasData || consume(target);
+            }
+
+            if (!hasData) {
+                try {
+                    Thread.sleep(consumeCycle);
+                } catch (InterruptedException e) {
+                }
+            }
+
+        }
+
+        // consumer thread is going to stop
+        // consume the last time
+        for (Group target : consumeTargets) {
+            consume(target);
+
+            target.consumer.onExit();
+        }
+    }
+
+    private boolean consume(Group target) {
+        boolean hasData;
+        LinkedList consumeList = new LinkedList();
+        for (int i = 0; i < target.channels.getChannelSize(); i++) {
+            Buffer buffer = target.channels.getBuffer(i);
+            consumeList.addAll(buffer.obtain());
+        }
+        hasData = consumeList.size() > 0;
+
+        if (consumeList.size() > 0) {
+            try {
+                target.consumer.consume(consumeList);
+            } catch (Throwable t) {
+                target.consumer.onError(consumeList, t);
+            }
+        }
+        return hasData;
+    }
+
+    /**
+     * Add a new target channels.
+     *
+     * @param channels
+     * @param consumer
+     */
+    public void addNewTarget(Channels channels, IConsumer consumer) {
+        Group group = new Group(channels, consumer);
+        // Recreate the new list to avoid change list while the list is used in consuming.
+        ArrayList<Group> newList = new ArrayList<Group>();
+        for (Group target : consumeTargets) {
+            newList.add(target);
+        }
+        newList.add(group);
+        consumeTargets = newList;
+        size += channels.size();
+    }
+
+    public long size() {
+        return size;
+    }
+
+    void shutdown() {
+        running = false;
+    }
+
+    private class Group {
+        private Channels channels;
+        private IConsumer consumer;
+
+        public Group(Channels channels, IConsumer consumer) {
+            this.channels = channels;
+            this.consumer = consumer;
+        }
+    }
+}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
new file mode 100644
index 0000000..9cd9c8c
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
+import org.junit.*;
+
+/**
+ * @author wusheng
+ */
+public class BulkConsumePoolTest {
+    @Test
+    public void testOneThreadPool() throws InterruptedException {
+        BulkConsumePool pool = new BulkConsumePool("testPool", 1, 50);
+        final ArrayList<Object> result1 = new ArrayList();
+        Channels c1 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
+        pool.add("test", c1,
+            new IConsumer() {
+                @Override public void init() {
+
+                }
+
+                @Override public void consume(List data) {
+                    for (Object datum : data) {
+                        result1.add(datum);
+                    }
+                }
+
+                @Override public void onError(List data, Throwable t) {
+
+                }
+
+                @Override public void onExit() {
+
+                }
+            });
+        pool.begin(c1);
+        final ArrayList<Object> result2 = new ArrayList();
+        Channels c2 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
+        pool.add("test2", c2,
+            new IConsumer() {
+                @Override public void init() {
+
+                }
+
+                @Override public void consume(List data) {
+                    for (Object datum : data) {
+                        result2.add(datum);
+                    }
+                }
+
+                @Override public void onError(List data, Throwable t) {
+
+                }
+
+                @Override public void onExit() {
+
+                }
+            });
+        pool.begin(c2);
+        c1.save(new Object());
+        c1.save(new Object());
+        c1.save(new Object());
+        c1.save(new Object());
+        c1.save(new Object());
+        c2.save(new Object());
+        c2.save(new Object());
+        Thread.sleep(2000);
+
+        Assert.assertEquals(5, result1.size());
+        Assert.assertEquals(2, result2.size());
+    }
+}
diff --git a/docker/config/application.yml b/docker/config/application.yml
index f9b645f..d99357a 100644
--- a/docker/config/application.yml
+++ b/docker/config/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
-    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 service-mesh:
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 144c753..072afe9 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -64,6 +64,6 @@
   * [GSON](https://github.com/google/gson) 2.8.x (Optional²)
 
 ¹Due to license incompatibilities/restrictions these plugins are hosted and released in 3rd part repository, 
- go to [OpenSkywalking java plugin extension repository](https://github.com/OpenSkywalking/java-plugin-extensions) to get these.
+ go to [SkyAPM java plugin extension repository](https://github.com/SkyAPM/java-plugin-extensions) to get these.
 
 ²These plugins affect the performance or must be used under some conditions, from experiences. So only released in `/optional-plugins`, copy to `/plugins` in order to make them work.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 3de179a..453253a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -30,6 +30,7 @@ public class CoreModuleConfig extends ModuleConfig {
     @Setter private String nameSpace;
     @Setter private String restHost;
     @Setter private int restPort;
+    @Setter private int jettySelectors = 1;
     @Setter private String restContextPath;
     @Setter private String gRPCHost;
     @Setter private int gRPCPort;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 0106fe7..f6db632 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -95,7 +95,7 @@ public class CoreModuleProvider extends ModuleProvider {
         }
         grpcServer.initialize();
 
-        jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath());
+        jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
         jettyServer.initialize();
 
         this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
index f290fd0..f5dc51b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
@@ -25,4 +25,8 @@ public class UnexpectedException extends RuntimeException {
     public UnexpectedException(String message) {
         super(message);
     }
+
+    public UnexpectedException(String message, Exception cause) {
+        super(message, cause);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index f11d014..148534a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -53,7 +54,14 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
 
-        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
+        String name = "INDICATOR_L1_AGGREGATION";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize(), 20);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 172874a..e90d98a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Objects;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -55,9 +56,21 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
         this.mergeDataCache = new MergeDataCache<>();
         this.indicatorDAO = indicatorDAO;
         this.nextWorker = nextWorker;
-        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
 
-        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
+        String name = "INDICATOR_L2_AGGREGATION";
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 4;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 20);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+
+        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 2000);
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
     }
 
     @Override void onWork(Indicator indicator) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 8d291d7..a6b972e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
@@ -45,8 +46,17 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
         this.modelName = modelName;
         this.nonMergeDataCache = new NonMergeDataCache<>();
         this.recordDAO = recordDAO;
+
+        String name = "RECORD_PERSISTENT";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+
         this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(new RecordPersistentWorker.PersistentConsumer(this), 1);
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RecordPersistentWorker.PersistentConsumer(this));
     }
 
     @Override public void in(Record record) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 5d6304e..8bf5dd6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -48,7 +48,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
         this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
         this.recordDAO = recordDAO;
         this.modelName = modelName;
-        this.dataCarrier = new DataCarrier<>(1, 10000);
+        this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
         this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
         this.lastReportTimestamp = System.currentTimeMillis();
         // Top N persistent only works per 10 minutes.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index 443eb73..ae4bc67 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -42,8 +43,15 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
         super(workerId);
         this.nextWorker = nextWorker;
         this.sources = new HashMap<>();
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(new AggregatorConsumer(this), 1, 200);
+        this.dataCarrier = new DataCarrier<>(1, 1000);
+        String name = "REGISTER_L1";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
     }
 
     @Override public final void in(RegisterSource source) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 35a83b4..1ccd21d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.source.Scope;
@@ -52,8 +52,17 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
         this.registerDAO = registerDAO;
         this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
         this.scope = scope;
-        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
-        this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1, 200);
+        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
+
+        String name = "REGISTER_L2";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RegisterPersistentWorker.PersistentConsumer(this));
     }
 
     @Override public final void in(RegisterSource registerSource) {
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
index 84e24d4..97f4c85 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
@@ -18,10 +18,12 @@
 
 package org.apache.skywalking.oap.server.library.server.jetty;
 
-import java.net.InetSocketAddress;
 import java.util.Objects;
+import org.apache.skywalking.oap.server.library.server.Server;
 import org.apache.skywalking.oap.server.library.server.*;
+import org.eclipse.jetty.server.*;
 import org.eclipse.jetty.servlet.*;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.*;
 
 /**
@@ -34,13 +36,19 @@ public class JettyServer implements Server {
     private final String host;
     private final int port;
     private final String contextPath;
+    private final int selectorNum;
     private org.eclipse.jetty.server.Server server;
     private ServletContextHandler servletContextHandler;
 
     public JettyServer(String host, int port, String contextPath) {
+        this(host, port, contextPath, -1);
+    }
+
+    public JettyServer(String host, int port, String contextPath, int selectorNum) {
         this.host = host;
         this.port = port;
         this.contextPath = contextPath;
+        this.selectorNum = selectorNum;
     }
 
     @Override
@@ -55,7 +63,20 @@ public class JettyServer implements Server {
 
     @Override
     public void initialize() {
-        server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
+        QueuedThreadPool threadPool = new QueuedThreadPool();
+        if (selectorNum > 0) {
+            threadPool.setMaxThreads(selectorNum * 2 + 2);
+        }
+
+        server = new org.eclipse.jetty.server.Server(threadPool);
+
+        HttpConfiguration httpConfig = new HttpConfiguration();
+        ServerConnector http = new ServerConnector(server, null, null, null,
+            1, selectorNum, new HttpConnectionFactory(httpConfig));
+        http.setPort(port);
+        http.setHost(host);
+
+        server.addConnector(http);
 
         servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
         servletContextHandler.setContextPath(contextPath);
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index 47b4396..46a9117 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -56,8 +56,8 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
         BufferStream<UpstreamSegment> stream = builder.build();
         stream.initialize();
 
-        dataCarrier = new DataCarrier<>(1, 1024);
-        dataCarrier.consume(new Consumer(stream), 1);
+        dataCarrier = new DataCarrier<>("SegmentStandardizationWorker", 1, 1024);
+        dataCarrier.consume(new Consumer(stream), 1, 200);
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         String metricNamePrefix = isV6 ? "v6_" : "v5_";
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 840777d..b8c0efc 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
-    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 #service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 28c7b97..24df981 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
-    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 service-mesh: