You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/29 08:36:29 UTC

[skywalking] 01/01: Rewrite the whole window and collection to new ReadWriteSafeCache.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch window-rewrite
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 9762d88564266cc5111c6c1e9cedf78edb1f8c58
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Apr 29 16:35:24 2020 +0800

    Rewrite the whole window and collection to new ReadWriteSafeCache.
---
 .../data/{DataCache.java => BufferedData.java}     |  22 ++++-
 ...ollection.java => LimitedSizeBufferedData.java} |  90 ++++--------------
 .../core/analysis/data/LimitedSizeDataCache.java   |  54 -----------
 .../core/analysis/data/MergableBufferedData.java   |  62 ++++++++++++
 .../server/core/analysis/data/MergeDataCache.java  |  54 -----------
 .../core/analysis/data/MergeDataCollection.java    |  97 -------------------
 .../core/analysis/data/NonMergeDataCache.java      |  46 ---------
 .../core/analysis/data/NonMergeDataCollection.java |  97 -------------------
 .../oap/server/core/analysis/data/QueueData.java   |  28 ------
 .../core/analysis/data/ReadWriteSafeCache.java     |  78 +++++++++++++++
 .../server/core/analysis/data/SWCollection.java    |  48 ----------
 .../oap/server/core/analysis/data/Window.java      | 105 ---------------------
 .../analysis/worker/MetricsAggregateWorker.java    |  58 ++++--------
 .../analysis/worker/MetricsPersistentWorker.java   |  29 +-----
 .../core/analysis/worker/PersistenceWorker.java    |  63 ++++---------
 .../server/core/analysis/worker/TopNWorker.java    |  49 ++++------
 .../oap/server/core/remote/data/StreamData.java    |   7 +-
 .../oap/server/core/storage/PersistenceTimer.java  |  26 +++--
 ...nTest.java => LimitedSizeBufferedDataTest.java} |  20 ++--
 19 files changed, 261 insertions(+), 772 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/DataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/BufferedData.java
similarity index 57%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/DataCache.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/BufferedData.java
index 868d16a..7d7cbb2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/DataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/BufferedData.java
@@ -18,9 +18,25 @@
 
 package org.apache.skywalking.oap.server.core.analysis.data;
 
-public interface DataCache {
+import java.util.List;
 
-    void writing();
+/**
+ * BufferedData represents a data collection in the memory. Data could be accepted and be drain to other collection.
+ *
+ * {@link #accept(Object)} and {@link #read()} wouldn't be required to thread-safe. BufferedData usually hosts by {@link
+ * ReadWriteSafeCache}.
+ */
+public interface BufferedData<T> {
+    /**
+     * Accept the data into the cache if it fits the conditions. The implementation maybe wouldn't accept the new input
+     * data.
+     *
+     * @param data to be added potentially.
+     */
+    void accept(T data);
 
-    void finishWriting();
+    /**
+     * Read all existing buffered data, and clear the memory.
+     */
+    List<T> read();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
similarity index 51%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
index ab5347d..d48f273 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
@@ -19,111 +19,61 @@
 package org.apache.skywalking.oap.server.core.analysis.data;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
 
-public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
-
-    private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
+/**
+ * LimitedSizeBufferedData is a thread no safe implementation of {@link BufferedData}. It collects limited records of
+ * each {@link StorageData#id()}.
+ */
+public class LimitedSizeBufferedData<STORAGE_DATA extends ComparableStorageData & StorageData> implements BufferedData<STORAGE_DATA> {
+    private final HashMap<String, LinkedList<STORAGE_DATA>> data;
     private final int limitedSize;
-    private volatile boolean writing;
-    private volatile boolean reading;
 
-    LimitedSizeDataCollection(int limitedSize) {
+    public LimitedSizeBufferedData(int limitedSize) {
         this.data = new HashMap<>();
-        this.writing = false;
-        this.reading = false;
         this.limitedSize = limitedSize;
     }
 
     @Override
-    public void finishWriting() {
-        writing = false;
-    }
-
-    @Override
-    public void writing() {
-        writing = true;
-    }
-
-    @Override
-    public boolean isWriting() {
-        return writing;
-    }
-
-    @Override
-    public void finishReading() {
-        reading = false;
-    }
-
-    @Override
-    public void reading() {
-        reading = true;
-    }
-
-    @Override
-    public boolean isReading() {
-        return reading;
-    }
-
-    @Override
-    public int size() {
-        return data.size();
-    }
-
-    @Override
-    public void clear() {
-        data.clear();
-    }
-
-    @Override
-    public boolean containsKey(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("Limited size data collection doesn't support containsKey operation.");
-    }
-
-    @Override
-    public STORAGE_DATA get(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("Limited size data collection doesn't support get operation.");
-    }
-
-    @Override
-    public void put(STORAGE_DATA value) {
-        LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
+    public void accept(final STORAGE_DATA data) {
+        final String id = data.id();
+        LinkedList<STORAGE_DATA> storageDataList = this.data.get(id);
         if (storageDataList == null) {
             storageDataList = new LinkedList<>();
-            data.put(value, storageDataList);
+            this.data.put(id, storageDataList);
         }
 
         if (storageDataList.size() < limitedSize) {
-            storageDataList.add(value);
+            storageDataList.add(data);
             return;
         }
 
         for (int i = 0; i < storageDataList.size(); i++) {
             STORAGE_DATA storageData = storageDataList.get(i);
-            if (value.compareTo(storageData) <= 0) {
+            if (data.compareTo(storageData) <= 0) {
                 if (i == 0) {
-                    // input value is less than the smallest in top N list, ignore
+                    // input data is less than the smallest in top N list, ignore
                 } else {
                     // Remove the smallest in top N list
-                    // add the current value into the right position
-                    storageDataList.add(i, value);
+                    // add the current data into the right position
+                    storageDataList.add(i, data);
                     storageDataList.removeFirst();
                 }
                 return;
             }
         }
 
-        // Add the value as biggest in top N list
-        storageDataList.addLast(value);
+        // Add the data as biggest in top N list
+        storageDataList.addLast(data);
         storageDataList.removeFirst();
     }
 
     @Override
-    public Collection<STORAGE_DATA> collection() {
+    public List<STORAGE_DATA> read() {
         List<STORAGE_DATA> collection = new ArrayList<>();
         data.values().forEach(e -> e.forEach(collection::add));
         return collection;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
deleted file mode 100644
index c7393c4..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
-
-public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
-
-    private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
-    private final int limitSize;
-
-    public LimitedSizeDataCache(int limitSize) {
-        super(false);
-        this.limitSize = limitSize;
-        init();
-    }
-
-    @Override
-    public SWCollection<STORAGE_DATA> collectionInstance() {
-        return new LimitedSizeDataCollection<>(limitSize);
-    }
-
-    public void add(STORAGE_DATA data) {
-        limitedSizeDataCollection.put(data);
-    }
-
-    @Override
-    public void writing() {
-        limitedSizeDataCollection = getCurrentAndWriting();
-    }
-
-    @Override
-    public void finishWriting() {
-        limitedSizeDataCollection.finishWriting();
-        limitedSizeDataCollection = null;
-    }
-}
-
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergableBufferedData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergableBufferedData.java
new file mode 100644
index 0000000..24b92c0
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergableBufferedData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+
+/**
+ * MergableBufferedData is a thread no safe implementation of {@link BufferedData}. {@link Metrics} in this cache would
+ * be {@link Metrics#combine(Metrics)} if their {@link Metrics#id()}s are same.
+ *
+ * Concurrency {@link #accept(Metrics)}s and {@link #read()} while {@link #accept(Metrics)} are both not recommended.
+ */
+public class MergableBufferedData<METRICS extends Metrics> implements BufferedData<METRICS> {
+    private Map<String, METRICS> buffer;
+
+    public MergableBufferedData() {
+        buffer = new HashMap<>();
+    }
+
+    /**
+     * Accept the data into the cache and merge with the existing value.
+     *
+     * This method is not thread safe, should avoid concurrency calling.
+     *
+     * @param data to be added potentially.
+     */
+    @Override
+    public void accept(final METRICS data) {
+        final String id = data.id();
+        final METRICS existed = buffer.get(id);
+        if (existed == null) {
+            buffer.put(id, data);
+        } else {
+            existed.combine(data);
+        }
+    }
+
+    @Override
+    public List<METRICS> read() {
+        return buffer.values().stream().collect(Collectors.toList());
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
deleted file mode 100644
index a7c4b6b..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-
-public class MergeDataCache<METRICS extends Metrics> extends Window<METRICS> implements DataCache {
-
-    private SWCollection<METRICS> lockedMergeDataCollection;
-
-    @Override
-    public SWCollection<METRICS> collectionInstance() {
-        return new MergeDataCollection<>();
-    }
-
-    public boolean containsKey(METRICS key) {
-        return lockedMergeDataCollection.containsKey(key);
-    }
-
-    public Metrics get(METRICS key) {
-        return lockedMergeDataCollection.get(key);
-    }
-
-    public void put(METRICS data) {
-        lockedMergeDataCollection.put(data);
-    }
-
-    @Override
-    public void writing() {
-        lockedMergeDataCollection = getCurrentAndWriting();
-    }
-
-    @Override
-    public void finishWriting() {
-        lockedMergeDataCollection.finishWriting();
-        lockedMergeDataCollection = null;
-    }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
deleted file mode 100644
index f310cb4..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-
-public class MergeDataCollection<STREAM_DATA extends StreamData> implements SWCollection<STREAM_DATA> {
-
-    private Map<STREAM_DATA, STREAM_DATA> collection;
-    private volatile boolean writing;
-    private volatile boolean reading;
-
-    MergeDataCollection() {
-        this.collection = new HashMap<>();
-        this.writing = false;
-        this.reading = false;
-    }
-
-    @Override
-    public void finishWriting() {
-        writing = false;
-    }
-
-    @Override
-    public void writing() {
-        writing = true;
-    }
-
-    @Override
-    public boolean isWriting() {
-        return writing;
-    }
-
-    @Override
-    public void finishReading() {
-        reading = false;
-    }
-
-    @Override
-    public void reading() {
-        reading = true;
-    }
-
-    @Override
-    public boolean isReading() {
-        return reading;
-    }
-
-    @Override
-    public boolean containsKey(STREAM_DATA key) {
-        return collection.containsKey(key);
-    }
-
-    @Override
-    public void put(STREAM_DATA value) {
-        collection.put(value, value);
-    }
-
-    @Override
-    public STREAM_DATA get(STREAM_DATA key) {
-        return collection.get(key);
-    }
-
-    @Override
-    public int size() {
-        return collection.size();
-    }
-
-    @Override
-    public void clear() {
-        collection.clear();
-    }
-
-    @Override
-    public Collection<STREAM_DATA> collection() {
-        return collection.values();
-    }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
deleted file mode 100644
index 69d8d8a..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-
-public class NonMergeDataCache<STORAGE_DATA extends StorageData> extends Window<STORAGE_DATA> implements DataCache {
-
-    private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
-
-    @Override
-    public SWCollection<STORAGE_DATA> collectionInstance() {
-        return new NonMergeDataCollection<>();
-    }
-
-    public void add(STORAGE_DATA data) {
-        lockedMergeDataCollection.put(data);
-    }
-
-    @Override
-    public void writing() {
-        lockedMergeDataCollection = getCurrentAndWriting();
-    }
-
-    @Override
-    public void finishWriting() {
-        lockedMergeDataCollection.finishWriting();
-        lockedMergeDataCollection = null;
-    }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
deleted file mode 100644
index 6a56465..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-
-public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements SWCollection<STORAGE_DATA> {
-
-    private final List<STORAGE_DATA> data;
-    private volatile boolean writing;
-    private volatile boolean reading;
-
-    NonMergeDataCollection() {
-        this.data = new ArrayList<>();
-        this.writing = false;
-        this.reading = false;
-    }
-
-    @Override
-    public void finishWriting() {
-        writing = false;
-    }
-
-    @Override
-    public void writing() {
-        writing = true;
-    }
-
-    @Override
-    public boolean isWriting() {
-        return writing;
-    }
-
-    @Override
-    public void finishReading() {
-        reading = false;
-    }
-
-    @Override
-    public void reading() {
-        reading = true;
-    }
-
-    @Override
-    public boolean isReading() {
-        return reading;
-    }
-
-    @Override
-    public int size() {
-        return data.size();
-    }
-
-    @Override
-    public void clear() {
-        data.clear();
-    }
-
-    @Override
-    public boolean containsKey(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("Non merge data collection doesn't support containsKey operation.");
-    }
-
-    @Override
-    public STORAGE_DATA get(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("Non merge data collection doesn't support get operation.");
-    }
-
-    @Override
-    public void put(STORAGE_DATA value) {
-        data.add(value);
-    }
-
-    @Override
-    public Collection<STORAGE_DATA> collection() {
-        return data;
-    }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
deleted file mode 100644
index 9880b6c..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-public interface QueueData {
-
-    void resetEndOfBatch();
-
-    void asEndOfBatch();
-
-    boolean isEndOfBatch();
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
new file mode 100644
index 0000000..0263334
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ReadWriteSafeCache provides a read/write isolated cache.
+ */
+public class ReadWriteSafeCache<T> {
+    /**
+     * The buffer1 is used for read or write.
+     */
+    private BufferedData<T> buffer1;
+    /**
+     * The buffer2 is used for read or write.
+     */
+    private BufferedData<T> buffer2;
+
+    /**
+     * Read pointer should be pointing to {@link #buffer1} or {@link #buffer2}, and always not NULL.
+     */
+    private volatile BufferedData<T> readBufferPointer;
+    /**
+     * Write pointer should be pointing to {@link #buffer1} or {@link #buffer2}, and always not NULL.
+     */
+    private volatile BufferedData<T> writeBufferPointer;
+
+    private final ReentrantLock lock;
+
+    public ReadWriteSafeCache(BufferedData<T> buffer1, BufferedData<T> buffer2) {
+        this.buffer1 = buffer1;
+        this.buffer2 = buffer2;
+        readBufferPointer = this.buffer1;
+        writeBufferPointer = this.buffer2;
+        lock = new ReentrantLock();
+    }
+
+    public void write(T data) {
+        lock.lock();
+        try {
+            writeBufferPointer.accept(data);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public List<T> read() {
+        lock.lock();
+        try {
+            // Switch the read and write pointers, when there is no writing.
+            BufferedData<T> tempPointer = writeBufferPointer;
+            writeBufferPointer = readBufferPointer;
+            readBufferPointer = tempPointer;
+
+            return readBufferPointer.read();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
deleted file mode 100644
index 16bbe07..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.Collection;
-
-public interface SWCollection<DATA> {
-
-    void reading();
-
-    boolean isReading();
-
-    void writing();
-
-    boolean isWriting();
-
-    void clear();
-
-    int size();
-
-    void finishReading();
-
-    void finishWriting();
-
-    Collection<DATA> collection();
-
-    boolean containsKey(DATA key);
-
-    DATA get(DATA key);
-
-    void put(DATA value);
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
deleted file mode 100644
index 907da94..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Data cache window. Window holds two data collections(A and B). They are switchable based on outside command. In any
- * time, one collection is accepting the input data, and the other is immutable.
- *
- * This window makes sure there is not concurrency read-write situation.
- *
- * @param <DATA> type in the Window and internal collection.
- */
-public abstract class Window<DATA> {
-
-    private AtomicInteger windowSwitch = new AtomicInteger(0);
-
-    private SWCollection<DATA> pointer;
-
-    private SWCollection<DATA> windowDataA;
-    private SWCollection<DATA> windowDataB;
-
-    Window() {
-        this(true);
-    }
-
-    Window(boolean autoInit) {
-        if (autoInit) {
-            init();
-        }
-    }
-
-    protected void init() {
-        this.windowDataA = collectionInstance();
-        this.windowDataB = collectionInstance();
-        this.pointer = windowDataA;
-    }
-
-    public abstract SWCollection<DATA> collectionInstance();
-
-    public boolean trySwitchPointer() {
-        return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
-    }
-
-    public void trySwitchPointerFinally() {
-        windowSwitch.addAndGet(-1);
-    }
-
-    public void switchPointer() {
-        if (pointer == windowDataA) {
-            pointer = windowDataB;
-        } else {
-            pointer = windowDataA;
-        }
-        getLast().reading();
-    }
-
-    SWCollection<DATA> getCurrentAndWriting() {
-        if (pointer == windowDataA) {
-            windowDataA.writing();
-            return windowDataA;
-        } else {
-            windowDataB.writing();
-            return windowDataB;
-        }
-    }
-
-    private SWCollection<DATA> getCurrent() {
-        return pointer;
-    }
-
-    public int currentCollectionSize() {
-        return getCurrent().size();
-    }
-
-    public SWCollection<DATA> getLast() {
-        if (pointer == windowDataA) {
-            return windowDataB;
-        } else {
-            return windowDataA;
-        }
-    }
-
-    public void finishReadingLast() {
-        getLast().clear();
-        getLast().finishReading();
-    }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index f6f944a..3d5105f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.Iterator;
 import java.util.List;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -33,8 +35,6 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * MetricsAggregateWorker provides an in-memory metrics merging capability. This aggregation is called L1 aggregation,
@@ -42,20 +42,18 @@ import org.slf4j.LoggerFactory;
  * bucket, the L1 aggregation will merge them into one metrics object to reduce the unnecessary memory and network
  * payload.
  */
+@Slf4j
 public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
-
-    private static final Logger logger = LoggerFactory.getLogger(MetricsAggregateWorker.class);
-
     private AbstractWorker<Metrics> nextWorker;
     private final DataCarrier<Metrics> dataCarrier;
-    private final MergeDataCache<Metrics> mergeDataCache;
+    private final ReadWriteSafeCache<Metrics> mergeDataCache;
     private CounterMetrics aggregationCounter;
 
     MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
                            String modelName) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
-        this.mergeDataCache = new MergeDataCache<>();
+        this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData());
         String name = "METRICS_L1_AGGREGATION";
         this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
 
@@ -85,42 +83,18 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
 
     private void onWork(Metrics metrics) {
         aggregationCounter.inc();
-        aggregate(metrics);
+        mergeDataCache.write(metrics);
 
         if (metrics.isEndOfBatch()) {
-            sendToNext();
-        }
-    }
-
-    private void sendToNext() {
-        mergeDataCache.switchPointer();
-        while (mergeDataCache.getLast().isWriting()) {
-            try {
-                Thread.sleep(10);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-
-        mergeDataCache.getLast().collection().forEach(data -> {
-            if (logger.isDebugEnabled()) {
-                logger.debug(data.toString());
-            }
-
-            nextWorker.in(data);
-        });
-        mergeDataCache.finishReadingLast();
-    }
-
-    private void aggregate(Metrics metrics) {
-        mergeDataCache.writing();
-        if (mergeDataCache.containsKey(metrics)) {
-            mergeDataCache.get(metrics).combine(metrics);
-        } else {
-            mergeDataCache.put(metrics);
+            mergeDataCache.read().forEach(
+                data -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug(data.toString());
+                    }
+                    nextWorker.in(data);
+                }
+            );
         }
-
-        mergeDataCache.finishWriting();
     }
 
     private class AggregatorConsumer implements IConsumer<Metrics> {
@@ -153,7 +127,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
 
         @Override
         public void onError(List<Metrics> data, Throwable t) {
-            logger.error(t.getMessage(), t);
+            log.error(t.getMessage(), t);
         }
 
         @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 81538ef..cab7dd3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -32,7 +32,8 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -45,10 +46,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
  * MetricsPersistentWorker is an extension of {@link PersistenceWorker} and focuses on the Metrics data persistent.
  */
 @Slf4j
-public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDataCache<Metrics>> {
+public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
     private final Model model;
     private final Map<Metrics, Metrics> context;
-    private final MergeDataCache<Metrics> mergeDataCache;
     private final IMetricsDAO metricsDAO;
     private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
     private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
@@ -60,11 +60,10 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
                             AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
                             MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate) {
-        super(moduleDefineHolder);
+        super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
         this.model = model;
         this.context = new HashMap<>(100);
         this.enableDatabaseSession = enableDatabaseSession;
-        this.mergeDataCache = new MergeDataCache<>();
         this.metricsDAO = metricsDAO;
         this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
         this.nextExportWorker = Optional.ofNullable(nextExportWorker);
@@ -112,11 +111,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
     }
 
     @Override
-    public MergeDataCache<Metrics> getCache() {
-        return mergeDataCache;
-    }
-
-    @Override
     public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
         long start = System.currentTimeMillis();
         if (lastCollection.size() == 0) {
@@ -197,21 +191,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
             nextExportWorker -> nextExportWorker.in(new ExportEvent(metrics, ExportEvent.EventType.TOTAL)));
     }
 
-    @Override
-    public void cacheData(Metrics input) {
-        mergeDataCache.writing();
-        if (mergeDataCache.containsKey(input)) {
-            Metrics metrics = mergeDataCache.get(input);
-            metrics.combine(input);
-            metrics.calculate();
-        } else {
-            input.calculate();
-            mergeDataCache.put(input);
-        }
-
-        mergeDataCache.finishWriting();
-    }
-
     /**
      * Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist.
      */
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 156618d..536a6d9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -20,28 +20,29 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.Collection;
 import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.data.SWCollection;
-import org.apache.skywalking.oap.server.core.analysis.data.Window;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * PersistenceWorker take the responsibility to pushing data to the final storage. The target storage is based on the
  * activate storage implementation. This worker controls the persistence flow.
  *
  * @param <INPUT> The type of worker input. All inputs will be merged and saved.
- * @param <CACHE> Cache type to hold all input.
  */
-public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> {
+@Slf4j
+public abstract class PersistenceWorker<INPUT extends StorageData> extends AbstractWorker<INPUT> {
+    @Getter(AccessLevel.PROTECTED)
+    private final ReadWriteSafeCache<INPUT> cache;
 
-    private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
-
-    PersistenceWorker(ModuleDefineHolder moduleDefineHolder) {
+    PersistenceWorker(ModuleDefineHolder moduleDefineHolder, ReadWriteSafeCache<INPUT> cache) {
         super(moduleDefineHolder);
+        this.cache = cache;
     }
 
     /**
@@ -54,9 +55,9 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
     /**
      * Cache data based on different strategies. See the implementations for more details.
      */
-    public abstract void cacheData(INPUT input);
-
-    public abstract CACHE getCache();
+    public void cacheData(INPUT input) {
+        cache.write(input);
+    }
 
     /**
      * The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}.
@@ -67,24 +68,6 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
     public abstract void endOfRound(long tookTime);
 
     /**
-     * For every cache implementation(see {@link Window}), there are two dataset, switch them when one persistence round
-     * is beginning, in order to make cached data immutable.
-     *
-     * @return true if switch successfully.
-     */
-    public boolean flushAndSwitch() {
-        boolean isSwitch;
-        try {
-            if (isSwitch = getCache().trySwitchPointer()) {
-                getCache().switchPointer();
-            }
-        } finally {
-            getCache().trySwitchPointerFinally();
-        }
-        return isSwitch;
-    }
-
-    /**
      * Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage
      * implementations.
      *
@@ -93,22 +76,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
      */
     public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests);
 
-    public final void buildBatchRequests(List<PrepareRequest> prepareRequests) {
-        try {
-            SWCollection<INPUT> last = getCache().getLast();
-            while (last.isWriting()) {
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException e) {
-                    logger.warn("thread wake up");
-                }
-            }
-
-            if (last.collection() != null) {
-                prepareBatch(last.collection(), prepareRequests);
-            }
-        } finally {
-            getCache().finishReadingLast();
-        }
+    public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
+        final List<INPUT> dataList = getCache().read();
+        prepareBatch(dataList, prepareRequests);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 2b42620..3c150c7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -22,7 +22,8 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -34,59 +35,43 @@ import org.slf4j.LoggerFactory;
 /**
  * Top N worker is a persistence worker. Cache and order the data, flush in longer period.
  */
-public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
+public class TopNWorker extends PersistenceWorker<TopN> {
 
     private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
 
-    private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
     private final IRecordDAO recordDAO;
     private final Model model;
     private final DataCarrier<TopN> dataCarrier;
-    private long reportCycle;
+    private long reportPeriod;
     private volatile long lastReportTimestamp;
 
-    TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, int topNSize, long reportCycle,
-        IRecordDAO recordDAO) {
-        super(moduleDefineHolder);
-        this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
+    TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, int topNSize, long reportPeriod,
+               IRecordDAO recordDAO) {
+        super(
+            moduleDefineHolder,
+            new ReadWriteSafeCache<>(new LimitedSizeBufferedData<>(topNSize), new LimitedSizeBufferedData<>(topNSize))
+        );
         this.recordDAO = recordDAO;
         this.model = model;
         this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
         this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
         this.lastReportTimestamp = System.currentTimeMillis();
         // Top N persistent works per 10 minutes default.
-        this.reportCycle = reportCycle;
-    }
-
-    @Override
-    public void cacheData(TopN data) {
-        limitedSizeDataCache.writing();
-        try {
-            limitedSizeDataCache.add(data);
-        } finally {
-            limitedSizeDataCache.finishWriting();
-        }
-    }
-
-    @Override
-    public LimitedSizeDataCache<TopN> getCache() {
-        return limitedSizeDataCache;
+        this.reportPeriod = reportPeriod;
     }
 
     /**
-     * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
-     * time windows.
-     * <p>
-     * Switch and persistent attempt happens based on reportCycle.
+     * Force overriding the parent buildBatchRequests. Use its own report period.
      */
     @Override
-    public boolean flushAndSwitch() {
+    public void buildBatchRequests(final List<PrepareRequest> prepareRequests) {
         long now = System.currentTimeMillis();
-        if (now - lastReportTimestamp <= reportCycle) {
-            return false;
+        if (now - lastReportTimestamp <= reportPeriod) {
+            // Only do report in its own report period.
+            return;
         }
         lastReportTimestamp = now;
-        return super.flushAndSwitch();
+        super.buildBatchRequests(prepareRequests);
     }
 
     @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
index 6b761e7..2c5b0b0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
@@ -18,25 +18,20 @@
 
 package org.apache.skywalking.oap.server.core.remote.data;
 
-import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
 import org.apache.skywalking.oap.server.core.remote.Deserializable;
 import org.apache.skywalking.oap.server.core.remote.Serializable;
 
-public abstract class StreamData implements QueueData, Serializable, Deserializable {
-
+public abstract class StreamData implements Serializable, Deserializable {
     private boolean endOfBatch = false;
 
-    @Override
     public void resetEndOfBatch() {
         this.endOfBatch = false;
     }
 
-    @Override
     public void asEndOfBatch() {
         this.endOfBatch = true;
     }
 
-    @Override
     public boolean isEndOfBatch() {
         return this.endOfBatch;
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 35730bd..7fada1b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -62,14 +62,26 @@ public enum PersistenceTimer {
         MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
                                                      .provider()
                                                      .getService(MetricsCreator.class);
-        errorCounter = metricsCreator.createCounter("persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
-        prepareLatency = metricsCreator.createHistogramMetric("persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
-        executeLatency = metricsCreator.createHistogramMetric("persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        errorCounter = metricsCreator.createCounter(
+            "persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer",
+            MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+        );
+        prepareLatency = metricsCreator.createHistogramMetric(
+            "persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer",
+            MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+        );
+        executeLatency = metricsCreator.createHistogramMetric(
+            "persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
+            MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+        );
 
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor()
-                     .scheduleWithFixedDelay(new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
-                         .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+                     .scheduleWithFixedDelay(
+                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
+                             .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
+                         TimeUnit.SECONDS
+                     );
 
             this.isStarted = true;
         }
@@ -95,9 +107,7 @@ public enum PersistenceTimer {
                         logger.debug("extract {} worker data and save", worker.getClass().getName());
                     }
 
-                    if (worker.flushAndSwitch()) {
-                        worker.buildBatchRequests(prepareRequests);
-                    }
+                    worker.buildBatchRequests(prepareRequests);
 
                     worker.endOfRound(System.currentTimeMillis() - lastTime);
                 });
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
similarity index 79%
rename from oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
rename to oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
index 4f506ab..9a6e217 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
@@ -23,18 +23,18 @@ import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class LimitedSizeDataCollectionTest {
+public class LimitedSizeBufferedDataTest {
     @Test
     public void testPut() {
-        LimitedSizeDataCollection<MockStorageData> collection = new LimitedSizeDataCollection<>(5);
-        collection.put(new MockStorageData(1));
-        collection.put(new MockStorageData(3));
-        collection.put(new MockStorageData(5));
-        collection.put(new MockStorageData(7));
-        collection.put(new MockStorageData(9));
+        LimitedSizeBufferedData<MockStorageData> collection = new LimitedSizeBufferedData<>(5);
+        collection.accept(new MockStorageData(1));
+        collection.accept(new MockStorageData(3));
+        collection.accept(new MockStorageData(5));
+        collection.accept(new MockStorageData(7));
+        collection.accept(new MockStorageData(9));
 
         MockStorageData income = new MockStorageData(4);
-        collection.put(income);
+        collection.accept(income);
 
         int[] expected = new int[] {
             3,
@@ -44,7 +44,7 @@ public class LimitedSizeDataCollectionTest {
             9
         };
         int i = 0;
-        for (MockStorageData data : collection.collection()) {
+        for (MockStorageData data : collection.read()) {
             Assert.assertEquals(expected[i++], data.latency);
         }
     }
@@ -64,7 +64,7 @@ public class LimitedSizeDataCollectionTest {
 
         @Override
         public String id() {
-            return null;
+            return "id";
         }
 
         @Override