You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/06/29 06:34:42 UTC

[skywalking] branch master updated: Support prepare and save metrics concurrency (#7153)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new a966eea  Support prepare and save metrics concurrency (#7153)
a966eea is described below

commit a966eea35c8a939dec3803282d5516dd431e1d73
Author: Alvin <32...@qq.com>
AuthorDate: Tue Jun 29 14:34:29 2021 +0800

    Support prepare and save metrics concurrency (#7153)
---
 CHANGES.md                                         |   1 +
 docs/en/setup/backend/configuration-vocabulary.md  |   1 +
 .../src/main/resources/application.yml             |   2 +
 .../oap/server/core/CoreModuleConfig.java          |  11 +-
 .../server/core/storage/BlockingBatchQueue.java    |  37 +++++
 .../oap/server/core/storage/PersistenceTimer.java  | 175 +++++++++++++++++----
 .../core/storage/BlockingBatchQueueBenchmark.java  | 149 ++++++++++++++++++
 .../BlockingBatchQueueWithLinkedBlockingQueue.java |  74 +++++++++
 .../BlockingBatchQueueWithReentrantLock.java       | 100 ++++++++++++
 .../server/core/storage/PersistenceTimerTest.java  | 123 +++++++++++++++
 10 files changed, 638 insertions(+), 35 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1fbb79b..0ebb9d5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -42,6 +42,7 @@ Release Notes.
 * Upgrade commons-lang3 to avoid potential NPE in some JDK versions.
 * OAL supports generating metrics from events.
 * Support endpoint name grouping by OpenAPI definitions.
+* Concurrent create PrepareRequest when persist Metrics
 * Fix CounterWindow increase computing issue.
 * Performance: optimize Envoy ALS analyzer performance in high traffic load scenario (reduce ~1cpu in ~10k RPS).
 * Performance: trim useless metadata fields in Envoy ALS metadata to improve performance.
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 33f406a..c435c8f 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -41,6 +41,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | maxPageSizeOfQueryProfileSnapshot|The max size in every OAP query for snapshot analysis| - | 500 |
 | - | - | maxSizeOfAnalyzeProfileSnapshot|The max number of snapshots analyzed by OAP| - | 12000 |
 | - | - | syncThreads|The number of threads used to synchronously refresh the metrics data to the storage.| SW_CORE_SYNC_THREADS | 2 |
+| - | - | prepareThreads|The number of threads used to prepare metrics data to the storage.| SW_CORE_PREPARE_THREADS | 2 |
 | - | - | maxSyncOperationNum|The maximum number of processes supported for each synchronous storage operation. When the number of the flush data is greater than this value, it will be assigned to multiple cores for execution.| SW_CORE_MAX_SYNC_OPERATION_NUM | 50000 |
 | - | - | enableEndpointNameGroupingByOpenapi |Turn it on then automatically grouping endpoint by the given OpenAPI definitions.| SW_CORE_ENABLE_ENDPOINT_NAME_GROUPING_BY_OPAENAPI | true |
 |cluster|standalone| - | standalone is not suitable for one node running, no available configuration.| - | - |
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 4c39a68..e6db202 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -106,6 +106,8 @@ core:
     searchableAlarmTags: ${SW_SEARCHABLE_ALARM_TAG_KEYS:level}
     # The number of threads used to synchronously refresh the metrics data to the storage.
     syncThreads: ${SW_CORE_SYNC_THREADS:2}
+    # The number of threads used to prepare metrics data to the storage.
+    prepareThreads: ${SW_CORE_PREPARE_THREADS:2}
     # The maximum number of processes supported for each synchronous storage operation. When the number of the flush data is greater than this value, it will be assigned to multiple cores for execution.
     maxSyncOperationNum: ${SW_CORE_MAX_SYNC_OPERATION_NUM:50000}
     # Turn it on then automatically grouping endpoint by the given OpenAPI definitions.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 80b3e7c..8ffcc56 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -52,7 +52,7 @@ public class CoreModuleConfig extends ModuleConfig {
     /**
      * The period of doing data persistence. Unit is second.
      */
-
+    @Setter
     private long persistentPeriod = 3;
 
     private boolean enableDataKeeperExecutor = true;
@@ -151,6 +151,15 @@ public class CoreModuleConfig extends ModuleConfig {
     private int syncThreads = 2;
 
     /**
+     * The number of threads used to prepare metrics data to the storage.
+     *
+     * @since 8.7.0
+     */
+    @Setter
+    @Getter
+    private int prepareThreads = 2;
+
+    /**
      * The maximum number of processes supported for each synchronous storage operation. When the number of the flush
      * data is greater than this value, it will be assigned to multiple cores for execution.
      */
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueue.java
new file mode 100644
index 0000000..cd1cd74
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueue.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.List;
+
+/**
+ * A blocking queue implementation for persistent process.
+ * Poll method only returns when it matches the threshold or no further appending declared.
+ */
+interface BlockingBatchQueue<E> {
+    List<E> poll() throws InterruptedException;
+
+    void offer(List<E> elements);
+
+    void noFurtherAppending();
+
+    void furtherAppending();
+
+    int size();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 4410616..a3f0697 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -18,13 +18,18 @@
 
 package org.apache.skywalking.oap.server.core.storage;
 
-import com.google.common.collect.Lists;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
@@ -43,16 +48,18 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 @Slf4j
 public enum PersistenceTimer {
     INSTANCE;
-    private Boolean isStarted = false;
+    @VisibleForTesting
+    boolean isStarted = false;
     private final Boolean debug;
     private CounterMetrics errorCounter;
     private HistogramMetrics prepareLatency;
     private HistogramMetrics executeLatency;
+    private HistogramMetrics allLatency;
     private long lastTime = System.currentTimeMillis();
-    private final List<PrepareRequest> prepareRequests = new ArrayList<>(50000);
     private int syncOperationThreadsNum;
     private int maxSyncoperationNum;
     private ExecutorService executorService;
+    private ExecutorService prepareExecutorService;
 
     PersistenceTimer() {
         this.debug = System.getProperty("debug") != null;
@@ -77,9 +84,15 @@ public enum PersistenceTimer {
             "persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
             MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
         );
+        allLatency = metricsCreator.createHistogramMetric(
+            "persistence_timer_bulk_all_latency", "Latency of the all stage in persistence timer",
+            MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+        );
+
         syncOperationThreadsNum = moduleConfig.getSyncThreads();
         maxSyncoperationNum = moduleConfig.getMaxSyncOperationNum();
         executorService = Executors.newFixedThreadPool(syncOperationThreadsNum);
+        prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads());
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor()
                      .scheduleWithFixedDelay(
@@ -93,43 +106,61 @@ public enum PersistenceTimer {
     }
 
     private void extractDataAndSave(IBatchDAO batchDAO) {
+
         if (log.isDebugEnabled()) {
             log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
+        HistogramMetrics.Timer allTimer = allLatency.createTimer();
+        // Use `stop` as a control signal to make fail-fast in the persistence process.
+        AtomicBoolean stop = new AtomicBoolean(false);
 
+        DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new DefaultBlockingBatchQueue(
+            this.maxSyncoperationNum);
         try {
-            HistogramMetrics.Timer timer = prepareLatency.createTimer();
+            List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
+            persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+            persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
 
-            try {
-                List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-                persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
-                persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+            // CountDownLatch makes sure all prepare threads done eventually.
+            CountDownLatch prepareStageCountDownLatch = new CountDownLatch(persistenceWorkers.size());
 
-                persistenceWorkers.forEach(worker -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("extract {} worker data and save", worker.getClass().getName());
+            persistenceWorkers.forEach(worker -> {
+                prepareExecutorService.submit(() -> {
+                    if (stop.get()) {
+                        prepareStageCountDownLatch.countDown();
+                        return;
                     }
 
-                    worker.buildBatchRequests(prepareRequests);
-
-                    worker.endOfRound(System.currentTimeMillis() - lastTime);
+                    HistogramMetrics.Timer timer = prepareLatency.createTimer();
+                    try {
+                        if (log.isDebugEnabled()) {
+                            log.debug("extract {} worker data and save", worker.getClass().getName());
+                        }
+                        List<PrepareRequest> innerPrepareRequests = new ArrayList<>(5000);
+                        worker.buildBatchRequests(innerPrepareRequests);
+                        // Push the prepared requests into DefaultBlockingBatchQueue,
+                        // the executorService consumes from it when it reaches the size of batch.
+                        prepareQueue.offer(innerPrepareRequests);
+                        worker.endOfRound(System.currentTimeMillis() - lastTime);
+                    } finally {
+                        timer.finish();
+                        prepareStageCountDownLatch.countDown();
+                    }
                 });
+            });
 
-                if (debug) {
-                    log.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
-                }
-            } finally {
-                timer.finish();
-            }
-
-            HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
-            try {
-                List<List<PrepareRequest>> partitions = Lists.partition(prepareRequests, maxSyncoperationNum);
-                CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
-                for (final List<PrepareRequest> partition : partitions) {
-                    executorService.submit(() -> {
+            List<Future<?>> batchFutures = new ArrayList<>();
+            for (int i = 0; i < syncOperationThreadsNum; i++) {
+                Future<?> batchFuture = executorService.submit(() -> {
+                    // consume the metrics
+                    while (!stop.get()) {
+                        List<PrepareRequest> partition = prepareQueue.poll();
+                        if (partition.isEmpty()) {
+                            break;
+                        }
+                        HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
                         try {
                             if (CollectionUtils.isNotEmpty(partition)) {
                                 batchDAO.synchronous(partition);
@@ -137,23 +168,33 @@ public enum PersistenceTimer {
                         } catch (Throwable e) {
                             log.error(e.getMessage(), e);
                         } finally {
-                            countDownLatch.countDown();
+                            executeLatencyTimer.finish();
                         }
-                    });
-                }
-                countDownLatch.await();
-            } finally {
-                executeLatencyTimer.finish();
+                    }
+                    return null;
+                });
+                batchFutures.add(batchFuture);
             }
+
+            // Wait for prepare stage is done.
+            prepareStageCountDownLatch.await();
+            prepareQueue.noFurtherAppending();
+            // Wait for batch stage is done.
+            for (Future<?> result : batchFutures) {
+                result.get();
+            }
+
         } catch (Throwable e) {
             errorCounter.inc();
             log.error(e.getMessage(), e);
         } finally {
+
             if (log.isDebugEnabled()) {
                 log.debug("Persistence data save finish");
             }
 
-            prepareRequests.clear();
+            stop.set(true);
+            allTimer.finish();
             lastTime = System.currentTimeMillis();
         }
 
@@ -161,4 +202,70 @@ public enum PersistenceTimer {
             log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
         }
     }
+
+    @RequiredArgsConstructor
+    static class DefaultBlockingBatchQueue<E> implements BlockingBatchQueue<E> {
+
+        @Getter
+        private final int maxBatchSize;
+
+        @Getter
+        private boolean inAppendingMode = true;
+
+        private final List<E> elementData = new ArrayList<>(50000 * 3);
+
+        @Override
+        public void offer(List<E> elements) {
+            synchronized (elementData) {
+                if (!inAppendingMode) {
+                    throw new IllegalStateException();
+                }
+                elementData.addAll(elements);
+                if (elementData.size() >= maxBatchSize) {
+                    elementData.notifyAll();
+                }
+            }
+        }
+
+        @Override
+        public List<E> poll() throws InterruptedException {
+            synchronized (elementData) {
+                while (this.elementData.size() < maxBatchSize && inAppendingMode) {
+                    elementData.wait(1000);
+                }
+                if (CollectionUtils.isEmpty(elementData)) {
+                    return Collections.EMPTY_LIST;
+                }
+                List<E> sublist = this.elementData.subList(
+                    0, Math.min(maxBatchSize, this.elementData.size()));
+                List<E> partition = new ArrayList<>(sublist);
+                sublist.clear();
+                return partition;
+            }
+        }
+
+        @Override
+        public void noFurtherAppending() {
+            synchronized (elementData) {
+                inAppendingMode = false;
+                elementData.notifyAll();
+            }
+        }
+
+        @Override
+        public void furtherAppending() {
+            synchronized (elementData) {
+                inAppendingMode = true;
+                elementData.notifyAll();
+            }
+        }
+
+        @Override
+        public int size() {
+            synchronized (elementData) {
+                return elementData.size();
+            }
+        }
+    }
+
 }
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueBenchmark.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueBenchmark.java
new file mode 100644
index 0000000..08e55e4
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueBenchmark.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@Fork(2)
+public class BlockingBatchQueueBenchmark {
+
+    @State(Scope.Benchmark)
+    public static class MyState {
+
+        int count = 10_000_000;
+        PersistenceTimer.DefaultBlockingBatchQueue blockingBatchQueueWithSynchronized = new PersistenceTimer.DefaultBlockingBatchQueue(
+            50000);
+        BlockingBatchQueueWithLinkedBlockingQueue blockingBatchQueueWithLinkedBlockingQueue = new BlockingBatchQueueWithLinkedBlockingQueue(
+            50000);
+        BlockingBatchQueueWithReentrantLock blockingBatchQueueWithReentrantLock = new BlockingBatchQueueWithReentrantLock(
+            50000);
+        List<Integer> willAdd = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
+        int producerCount = 10;
+        int consumerCount = 2;
+        int producerLength = count / producerCount / 1000;
+
+        ExecutorService producer;
+        ExecutorService consumer;
+
+        @Setup(Level.Invocation)
+        public void before() {
+            producer = Executors.newFixedThreadPool(producerCount);
+            consumer = Executors.newFixedThreadPool(consumerCount);
+        }
+
+        @TearDown(Level.Invocation)
+        public void after() {
+            producer.shutdown();
+            consumer.shutdown();
+        }
+
+    }
+
+    @Benchmark
+    public void testSynchronized(MyState myState) throws InterruptedException, ExecutionException {
+        testProductAndConsume(myState, myState.blockingBatchQueueWithSynchronized);
+    }
+
+    @Benchmark
+    public void testReentrantLock(MyState myState) throws InterruptedException, ExecutionException {
+        testProductAndConsume(myState, myState.blockingBatchQueueWithReentrantLock);
+    }
+
+    @Benchmark
+    public void testLinkedBlockingQueue(MyState myState) throws InterruptedException, ExecutionException {
+        testProductAndConsume(myState, myState.blockingBatchQueueWithLinkedBlockingQueue);
+    }
+
+    private void testProductAndConsume(final MyState myState,
+                                       BlockingBatchQueue queue) throws InterruptedException, ExecutionException {
+        queue.furtherAppending();
+        CountDownLatch latch = new CountDownLatch(myState.producerCount);
+        for (int i = 0; i < myState.producerCount; i++) {
+            myState.producer.submit(() -> {
+                for (int j = 0; j < myState.producerLength; j++) {
+                    queue.offer(myState.willAdd);
+                }
+                latch.countDown();
+                return null;
+            });
+        }
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < myState.consumerCount; i++) {
+            Future<?> submit = myState.consumer.submit(() -> {
+                while (!queue.poll().isEmpty()) {
+                }
+                return null;
+            });
+            futures.add(submit);
+        }
+
+        latch.await();
+        queue.noFurtherAppending();
+        for (Future<?> future : futures) {
+            future.get();
+        }
+    }
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+            .include(BlockingBatchQueueBenchmark.class.getSimpleName())
+            .forks(2)
+            .build();
+        new Runner(opt).run();
+    }
+
+    /**
+     * # JMH version: 1.21
+     * # VM version: JDK 1.8.0_172, Java HotSpot(TM) 64-Bit Server VM, 25.172-b11
+     * # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/bin/java
+     * # VM options: -javaagent:/Users/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7442.40/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=50386:/Users/alvin/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7442.40/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8
+     * # Warmup: 5 iterations, 10 s each
+     * # Measurement: 5 iterations, 10 s each
+     * # Timeout: 10 min per iteration
+     * # Threads: 1 thread, will synchronize iterations
+     * # Benchmark mode: Throughput, ops/time
+     *
+     *  Benchmark                                             Mode  Cnt   Score   Error  Units
+     *  BlockingBatchQueueBenchmark.testLinkedBlockingQueue  thrpt   10   0.317 ± 0.032  ops/s
+     *  BlockingBatchQueueBenchmark.testReentrantLock        thrpt   10  16.018 ± 1.553  ops/s
+     *  BlockingBatchQueueBenchmark.testSynchronized         thrpt   10  16.769 ± 0.533  ops/s
+     *
+     */
+
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueWithLinkedBlockingQueue.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueWithLinkedBlockingQueue.java
new file mode 100644
index 0000000..cbf1932
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueWithLinkedBlockingQueue.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class BlockingBatchQueueWithLinkedBlockingQueue<E> implements BlockingBatchQueue<E> {
+
+    @Getter
+    private final int maxBatchSize;
+
+    @Getter
+    private volatile boolean inAppendingMode = true;
+
+    private final LinkedBlockingQueue<E> elementData = new LinkedBlockingQueue<>();
+
+    public void offer(List<E> elements) {
+        elementData.addAll(elements);
+    }
+
+    public List<E> poll() throws InterruptedException {
+        List<E> result = new ArrayList<>();
+        do {
+            E take = elementData.poll(1000, TimeUnit.MILLISECONDS);
+            if (take != null) {
+                result.add(take);
+            }
+            if (result.size() >= maxBatchSize) {
+                return result;
+            }
+            if (!inAppendingMode && this.elementData.isEmpty()) {
+                return result;
+            }
+        }
+        while (!this.elementData.isEmpty());
+        return result;
+
+    }
+
+    public void noFurtherAppending() {
+        inAppendingMode = false;
+    }
+
+    public void furtherAppending() {
+        inAppendingMode = true;
+    }
+
+    @Override
+    public int size() {
+        return this.elementData.size();
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueWithReentrantLock.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueWithReentrantLock.java
new file mode 100644
index 0000000..3fca3f7
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/BlockingBatchQueueWithReentrantLock.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+@RequiredArgsConstructor
+public class BlockingBatchQueueWithReentrantLock<E> implements BlockingBatchQueue<E> {
+
+    @Getter
+    private final int maxBatchSize;
+
+    @Getter
+    private volatile boolean inAppendingMode = true;
+
+    private final List<E> elementData = new ArrayList<>(50000);
+
+    private ReentrantLock reentrantLock = new ReentrantLock();
+    private Condition condition = this.reentrantLock.newCondition();
+
+    public void offer(List<E> elements) {
+        reentrantLock.lock();
+        try {
+            elementData.addAll(elements);
+            if (elementData.size() >= maxBatchSize) {
+                condition.signalAll();
+            }
+        } finally {
+            reentrantLock.unlock();
+        }
+    }
+
+    public List<E> poll() throws InterruptedException {
+        reentrantLock.lock();
+        try {
+            while (this.elementData.size() < maxBatchSize && inAppendingMode) {
+                condition.await(1000, TimeUnit.MILLISECONDS);
+            }
+            if (CollectionUtils.isEmpty(elementData)) {
+                return Collections.EMPTY_LIST;
+            }
+            List<E> sublist = this.elementData.subList(
+                0, Math.min(maxBatchSize, this.elementData.size()));
+            List<E> partition = new ArrayList<>(sublist);
+            sublist.clear();
+            return partition;
+        } finally {
+            reentrantLock.unlock();
+        }
+    }
+
+    public void noFurtherAppending() {
+        reentrantLock.lock();
+        try {
+            inAppendingMode = false;
+            condition.signalAll();
+        } finally {
+            reentrantLock.unlock();
+        }
+    }
+
+    public void furtherAppending() {
+        reentrantLock.lock();
+        try {
+            inAppendingMode = true;
+            condition.signalAll();
+        } finally {
+            reentrantLock.unlock();
+        }
+    }
+
+    @Override
+    public int size() {
+        return elementData.size();
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
new file mode 100644
index 0000000..1fc7355
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Data;
+import org.apache.skywalking.oap.server.core.CoreModuleConfig;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNWorker;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
+import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
+import org.junit.Assert;
+import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class PersistenceTimerTest {
+
+    @Test
+    public void testExtractDataAndSave() throws Exception {
+        Set<PrepareRequest> result = new HashSet();
+        int count = 101;
+        int workCount = 10;
+        CoreModuleConfig moduleConfig = new CoreModuleConfig();
+        moduleConfig.setMaxSyncOperationNum(5);
+        moduleConfig.setPersistentPeriod(Integer.MAX_VALUE);
+        IBatchDAO iBatchDAO = new IBatchDAO() {
+            @Override
+            public void asynchronous(InsertRequest insertRequest) {
+
+            }
+
+            @Override
+            public void synchronous(List<PrepareRequest> prepareRequests) {
+                synchronized (result) {
+                    result.addAll(prepareRequests);
+                }
+            }
+        };
+        for (int i = 0; i < workCount; i++) {
+            MetricsStreamProcessor.getInstance().getPersistentWorkers().add(genWorkers(i, count));
+            TopNStreamProcessor.getInstance().getPersistentWorkers().add(genTopNWorkers(i, count));
+        }
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleServiceHolder moduleServiceHolder = mock(ModuleServiceHolder.class);
+        doReturn((ModuleProviderHolder) () -> moduleServiceHolder).when(moduleManager).find(anyString());
+        doReturn(new MetricsCreatorNoop()).when(moduleServiceHolder).getService(MetricsCreator.class);
+        doReturn(iBatchDAO).when(moduleServiceHolder).getService(IBatchDAO.class);
+        PersistenceTimer.INSTANCE.isStarted = true;
+
+        PersistenceTimer.INSTANCE.start(moduleManager, moduleConfig);
+        Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO);
+
+        Assert.assertEquals(count * workCount * 2, result.size());
+    }
+
+    private MetricsPersistentWorker genWorkers(int num, int count) {
+        MetricsPersistentWorker persistenceWorker = mock(MetricsPersistentWorker.class);
+        doAnswer(invocation -> {
+            List argument = invocation.getArgument(0, List.class);
+            for (int i = 0; i < count; i++) {
+                argument.add(new MockStorageData(num + " " + UUID.randomUUID()));
+            }
+            return Void.class;
+        }).when(persistenceWorker).buildBatchRequests(anyList());
+        return persistenceWorker;
+    }
+
+    private TopNWorker genTopNWorkers(int num, int count) {
+        TopNWorker persistenceWorker = mock(TopNWorker.class);
+        doAnswer(invocation -> {
+            List argument = invocation.getArgument(0, List.class);
+            for (int i = 0; i < count; i++) {
+                argument.add(new MockStorageData(num + " " + UUID.randomUUID()));
+            }
+            return Void.class;
+        }).when(persistenceWorker).buildBatchRequests(anyList());
+        return persistenceWorker;
+    }
+
+    @Data
+    static class MockStorageData implements StorageData {
+        private final String id;
+
+        @Override
+        public String id() {
+            return id;
+        }
+
+    }
+
+}