You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/29 14:23:33 UTC
[skywalking] branch master updated: New ReadWriteSafeCache. Rewrite
the whole Window and Collection. Simplify codes (#4733)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new d97af96 New ReadWriteSafeCache. Rewrite the whole Window and Collection. Simplify codes (#4733)
d97af96 is described below
commit d97af96c8560c0c7615a32de6fb1f9df03c17006
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Apr 29 22:23:21 2020 +0800
New ReadWriteSafeCache. Rewrite the whole Window and Collection. Simplify codes (#4733)
* Rewrite the whole window and collection to new ReadWriteSafeCache.
* Remove redundant fields.
---
.../data/{DataCache.java => BufferedData.java} | 22 ++++-
...ollection.java => LimitedSizeBufferedData.java} | 90 ++++--------------
.../core/analysis/data/LimitedSizeDataCache.java | 54 -----------
.../core/analysis/data/MergableBufferedData.java | 62 ++++++++++++
.../server/core/analysis/data/MergeDataCache.java | 54 -----------
.../core/analysis/data/MergeDataCollection.java | 97 -------------------
.../core/analysis/data/NonMergeDataCache.java | 46 ---------
.../core/analysis/data/NonMergeDataCollection.java | 97 -------------------
.../oap/server/core/analysis/data/QueueData.java | 28 ------
.../core/analysis/data/ReadWriteSafeCache.java | 75 +++++++++++++++
.../server/core/analysis/data/SWCollection.java | 48 ----------
.../oap/server/core/analysis/data/Window.java | 105 ---------------------
.../analysis/worker/MetricsAggregateWorker.java | 58 ++++--------
.../analysis/worker/MetricsPersistentWorker.java | 29 +-----
.../core/analysis/worker/PersistenceWorker.java | 63 ++++---------
.../server/core/analysis/worker/TopNWorker.java | 49 ++++------
.../oap/server/core/remote/data/StreamData.java | 7 +-
.../oap/server/core/storage/PersistenceTimer.java | 26 +++--
...nTest.java => LimitedSizeBufferedDataTest.java} | 20 ++--
19 files changed, 258 insertions(+), 772 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/DataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/BufferedData.java
similarity index 57%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/DataCache.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/BufferedData.java
index 868d16a..7d7cbb2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/DataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/BufferedData.java
@@ -18,9 +18,25 @@
package org.apache.skywalking.oap.server.core.analysis.data;
-public interface DataCache {
+import java.util.List;
- void writing();
+/**
+ * BufferedData represents a data collection in the memory. Data could be accepted and be drain to other collection.
+ *
+ * {@link #accept(Object)} and {@link #read()} wouldn't be required to thread-safe. BufferedData usually hosts by {@link
+ * ReadWriteSafeCache}.
+ */
+public interface BufferedData<T> {
+ /**
+ * Accept the data into the cache if it fits the conditions. The implementation maybe wouldn't accept the new input
+ * data.
+ *
+ * @param data to be added potentially.
+ */
+ void accept(T data);
- void finishWriting();
+ /**
+ * Read all existing buffered data, and clear the memory.
+ */
+ List<T> read();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
similarity index 51%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
index ab5347d..d48f273 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
@@ -19,111 +19,61 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
-public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
-
- private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
+/**
+ * LimitedSizeBufferedData is a thread no safe implementation of {@link BufferedData}. It collects limited records of
+ * each {@link StorageData#id()}.
+ */
+public class LimitedSizeBufferedData<STORAGE_DATA extends ComparableStorageData & StorageData> implements BufferedData<STORAGE_DATA> {
+ private final HashMap<String, LinkedList<STORAGE_DATA>> data;
private final int limitedSize;
- private volatile boolean writing;
- private volatile boolean reading;
- LimitedSizeDataCollection(int limitedSize) {
+ public LimitedSizeBufferedData(int limitedSize) {
this.data = new HashMap<>();
- this.writing = false;
- this.reading = false;
this.limitedSize = limitedSize;
}
@Override
- public void finishWriting() {
- writing = false;
- }
-
- @Override
- public void writing() {
- writing = true;
- }
-
- @Override
- public boolean isWriting() {
- return writing;
- }
-
- @Override
- public void finishReading() {
- reading = false;
- }
-
- @Override
- public void reading() {
- reading = true;
- }
-
- @Override
- public boolean isReading() {
- return reading;
- }
-
- @Override
- public int size() {
- return data.size();
- }
-
- @Override
- public void clear() {
- data.clear();
- }
-
- @Override
- public boolean containsKey(STORAGE_DATA key) {
- throw new UnsupportedOperationException("Limited size data collection doesn't support containsKey operation.");
- }
-
- @Override
- public STORAGE_DATA get(STORAGE_DATA key) {
- throw new UnsupportedOperationException("Limited size data collection doesn't support get operation.");
- }
-
- @Override
- public void put(STORAGE_DATA value) {
- LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
+ public void accept(final STORAGE_DATA data) {
+ final String id = data.id();
+ LinkedList<STORAGE_DATA> storageDataList = this.data.get(id);
if (storageDataList == null) {
storageDataList = new LinkedList<>();
- data.put(value, storageDataList);
+ this.data.put(id, storageDataList);
}
if (storageDataList.size() < limitedSize) {
- storageDataList.add(value);
+ storageDataList.add(data);
return;
}
for (int i = 0; i < storageDataList.size(); i++) {
STORAGE_DATA storageData = storageDataList.get(i);
- if (value.compareTo(storageData) <= 0) {
+ if (data.compareTo(storageData) <= 0) {
if (i == 0) {
- // input value is less than the smallest in top N list, ignore
+ // input data is less than the smallest in top N list, ignore
} else {
// Remove the smallest in top N list
- // add the current value into the right position
- storageDataList.add(i, value);
+ // add the current data into the right position
+ storageDataList.add(i, data);
storageDataList.removeFirst();
}
return;
}
}
- // Add the value as biggest in top N list
- storageDataList.addLast(value);
+ // Add the data as biggest in top N list
+ storageDataList.addLast(data);
storageDataList.removeFirst();
}
@Override
- public Collection<STORAGE_DATA> collection() {
+ public List<STORAGE_DATA> read() {
List<STORAGE_DATA> collection = new ArrayList<>();
data.values().forEach(e -> e.forEach(collection::add));
return collection;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
deleted file mode 100644
index c7393c4..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
-
-public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
-
- private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
- private final int limitSize;
-
- public LimitedSizeDataCache(int limitSize) {
- super(false);
- this.limitSize = limitSize;
- init();
- }
-
- @Override
- public SWCollection<STORAGE_DATA> collectionInstance() {
- return new LimitedSizeDataCollection<>(limitSize);
- }
-
- public void add(STORAGE_DATA data) {
- limitedSizeDataCollection.put(data);
- }
-
- @Override
- public void writing() {
- limitedSizeDataCollection = getCurrentAndWriting();
- }
-
- @Override
- public void finishWriting() {
- limitedSizeDataCollection.finishWriting();
- limitedSizeDataCollection = null;
- }
-}
-
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergableBufferedData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergableBufferedData.java
new file mode 100644
index 0000000..24b92c0
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergableBufferedData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+
+/**
+ * MergableBufferedData is a thread no safe implementation of {@link BufferedData}. {@link Metrics} in this cache would
+ * be {@link Metrics#combine(Metrics)} if their {@link Metrics#id()}s are same.
+ *
+ * Concurrency {@link #accept(Metrics)}s and {@link #read()} while {@link #accept(Metrics)} are both not recommended.
+ */
+public class MergableBufferedData<METRICS extends Metrics> implements BufferedData<METRICS> {
+ private Map<String, METRICS> buffer;
+
+ public MergableBufferedData() {
+ buffer = new HashMap<>();
+ }
+
+ /**
+ * Accept the data into the cache and merge with the existing value.
+ *
+ * This method is not thread safe, should avoid concurrency calling.
+ *
+ * @param data to be added potentially.
+ */
+ @Override
+ public void accept(final METRICS data) {
+ final String id = data.id();
+ final METRICS existed = buffer.get(id);
+ if (existed == null) {
+ buffer.put(id, data);
+ } else {
+ existed.combine(data);
+ }
+ }
+
+ @Override
+ public List<METRICS> read() {
+ return buffer.values().stream().collect(Collectors.toList());
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
deleted file mode 100644
index a7c4b6b..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-
-public class MergeDataCache<METRICS extends Metrics> extends Window<METRICS> implements DataCache {
-
- private SWCollection<METRICS> lockedMergeDataCollection;
-
- @Override
- public SWCollection<METRICS> collectionInstance() {
- return new MergeDataCollection<>();
- }
-
- public boolean containsKey(METRICS key) {
- return lockedMergeDataCollection.containsKey(key);
- }
-
- public Metrics get(METRICS key) {
- return lockedMergeDataCollection.get(key);
- }
-
- public void put(METRICS data) {
- lockedMergeDataCollection.put(data);
- }
-
- @Override
- public void writing() {
- lockedMergeDataCollection = getCurrentAndWriting();
- }
-
- @Override
- public void finishWriting() {
- lockedMergeDataCollection.finishWriting();
- lockedMergeDataCollection = null;
- }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
deleted file mode 100644
index f310cb4..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-
-public class MergeDataCollection<STREAM_DATA extends StreamData> implements SWCollection<STREAM_DATA> {
-
- private Map<STREAM_DATA, STREAM_DATA> collection;
- private volatile boolean writing;
- private volatile boolean reading;
-
- MergeDataCollection() {
- this.collection = new HashMap<>();
- this.writing = false;
- this.reading = false;
- }
-
- @Override
- public void finishWriting() {
- writing = false;
- }
-
- @Override
- public void writing() {
- writing = true;
- }
-
- @Override
- public boolean isWriting() {
- return writing;
- }
-
- @Override
- public void finishReading() {
- reading = false;
- }
-
- @Override
- public void reading() {
- reading = true;
- }
-
- @Override
- public boolean isReading() {
- return reading;
- }
-
- @Override
- public boolean containsKey(STREAM_DATA key) {
- return collection.containsKey(key);
- }
-
- @Override
- public void put(STREAM_DATA value) {
- collection.put(value, value);
- }
-
- @Override
- public STREAM_DATA get(STREAM_DATA key) {
- return collection.get(key);
- }
-
- @Override
- public int size() {
- return collection.size();
- }
-
- @Override
- public void clear() {
- collection.clear();
- }
-
- @Override
- public Collection<STREAM_DATA> collection() {
- return collection.values();
- }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
deleted file mode 100644
index 69d8d8a..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-
-public class NonMergeDataCache<STORAGE_DATA extends StorageData> extends Window<STORAGE_DATA> implements DataCache {
-
- private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
-
- @Override
- public SWCollection<STORAGE_DATA> collectionInstance() {
- return new NonMergeDataCollection<>();
- }
-
- public void add(STORAGE_DATA data) {
- lockedMergeDataCollection.put(data);
- }
-
- @Override
- public void writing() {
- lockedMergeDataCollection = getCurrentAndWriting();
- }
-
- @Override
- public void finishWriting() {
- lockedMergeDataCollection.finishWriting();
- lockedMergeDataCollection = null;
- }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
deleted file mode 100644
index 6a56465..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-
-public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements SWCollection<STORAGE_DATA> {
-
- private final List<STORAGE_DATA> data;
- private volatile boolean writing;
- private volatile boolean reading;
-
- NonMergeDataCollection() {
- this.data = new ArrayList<>();
- this.writing = false;
- this.reading = false;
- }
-
- @Override
- public void finishWriting() {
- writing = false;
- }
-
- @Override
- public void writing() {
- writing = true;
- }
-
- @Override
- public boolean isWriting() {
- return writing;
- }
-
- @Override
- public void finishReading() {
- reading = false;
- }
-
- @Override
- public void reading() {
- reading = true;
- }
-
- @Override
- public boolean isReading() {
- return reading;
- }
-
- @Override
- public int size() {
- return data.size();
- }
-
- @Override
- public void clear() {
- data.clear();
- }
-
- @Override
- public boolean containsKey(STORAGE_DATA key) {
- throw new UnsupportedOperationException("Non merge data collection doesn't support containsKey operation.");
- }
-
- @Override
- public STORAGE_DATA get(STORAGE_DATA key) {
- throw new UnsupportedOperationException("Non merge data collection doesn't support get operation.");
- }
-
- @Override
- public void put(STORAGE_DATA value) {
- data.add(value);
- }
-
- @Override
- public Collection<STORAGE_DATA> collection() {
- return data;
- }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
deleted file mode 100644
index 9880b6c..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-public interface QueueData {
-
- void resetEndOfBatch();
-
- void asEndOfBatch();
-
- boolean isEndOfBatch();
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
new file mode 100644
index 0000000..33a4e01
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.data;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ReadWriteSafeCache provides a read/write isolated cache.
+ */
+public class ReadWriteSafeCache<T> {
+ /**
+ * Pointer of read buffer.
+ */
+ private volatile BufferedData<T> readBufferPointer;
+ /**
+ * Pointer of write buffer.
+ */
+ private volatile BufferedData<T> writeBufferPointer;
+ /**
+ * Read/Write lock.
+ */
+ private final ReentrantLock lock;
+
+ /**
+ * Build the Cache through two given buffer instances.
+ *
+ * @param buffer1 read/write switchable buffer
+ * @param buffer2 read/write switchable buffer. It is the write buffer at the beginning.
+ */
+ public ReadWriteSafeCache(BufferedData<T> buffer1, BufferedData<T> buffer2) {
+ readBufferPointer = buffer1;
+ writeBufferPointer = buffer2;
+ lock = new ReentrantLock();
+ }
+
+ public void write(T data) {
+ lock.lock();
+ try {
+ writeBufferPointer.accept(data);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public List<T> read() {
+ lock.lock();
+ try {
+ // Switch the read and write pointers, when there is no writing.
+ BufferedData<T> tempPointer = writeBufferPointer;
+ writeBufferPointer = readBufferPointer;
+ readBufferPointer = tempPointer;
+
+ return readBufferPointer.read();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
deleted file mode 100644
index 16bbe07..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.Collection;
-
-public interface SWCollection<DATA> {
-
- void reading();
-
- boolean isReading();
-
- void writing();
-
- boolean isWriting();
-
- void clear();
-
- int size();
-
- void finishReading();
-
- void finishWriting();
-
- Collection<DATA> collection();
-
- boolean containsKey(DATA key);
-
- DATA get(DATA key);
-
- void put(DATA value);
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
deleted file mode 100644
index 907da94..0000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Data cache window. Window holds two data collections(A and B). They are switchable based on outside command. In any
- * time, one collection is accepting the input data, and the other is immutable.
- *
- * This window makes sure there is not concurrency read-write situation.
- *
- * @param <DATA> type in the Window and internal collection.
- */
-public abstract class Window<DATA> {
-
- private AtomicInteger windowSwitch = new AtomicInteger(0);
-
- private SWCollection<DATA> pointer;
-
- private SWCollection<DATA> windowDataA;
- private SWCollection<DATA> windowDataB;
-
- Window() {
- this(true);
- }
-
- Window(boolean autoInit) {
- if (autoInit) {
- init();
- }
- }
-
- protected void init() {
- this.windowDataA = collectionInstance();
- this.windowDataB = collectionInstance();
- this.pointer = windowDataA;
- }
-
- public abstract SWCollection<DATA> collectionInstance();
-
- public boolean trySwitchPointer() {
- return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
- }
-
- public void trySwitchPointerFinally() {
- windowSwitch.addAndGet(-1);
- }
-
- public void switchPointer() {
- if (pointer == windowDataA) {
- pointer = windowDataB;
- } else {
- pointer = windowDataA;
- }
- getLast().reading();
- }
-
- SWCollection<DATA> getCurrentAndWriting() {
- if (pointer == windowDataA) {
- windowDataA.writing();
- return windowDataA;
- } else {
- windowDataB.writing();
- return windowDataB;
- }
- }
-
- private SWCollection<DATA> getCurrent() {
- return pointer;
- }
-
- public int currentCollectionSize() {
- return getCurrent().size();
- }
-
- public SWCollection<DATA> getLast() {
- if (pointer == windowDataA) {
- return windowDataB;
- } else {
- return windowDataA;
- }
- }
-
- public void finishReadingLast() {
- getLast().clear();
- getLast().finishReading();
- }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index f6f944a..3d5105f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -33,8 +35,6 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* MetricsAggregateWorker provides an in-memory metrics merging capability. This aggregation is called L1 aggregation,
@@ -42,20 +42,18 @@ import org.slf4j.LoggerFactory;
* bucket, the L1 aggregation will merge them into one metrics object to reduce the unnecessary memory and network
* payload.
*/
+@Slf4j
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
-
- private static final Logger logger = LoggerFactory.getLogger(MetricsAggregateWorker.class);
-
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
- private final MergeDataCache<Metrics> mergeDataCache;
+ private final ReadWriteSafeCache<Metrics> mergeDataCache;
private CounterMetrics aggregationCounter;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
String modelName) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
- this.mergeDataCache = new MergeDataCache<>();
+ this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData());
String name = "METRICS_L1_AGGREGATION";
this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
@@ -85,42 +83,18 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private void onWork(Metrics metrics) {
aggregationCounter.inc();
- aggregate(metrics);
+ mergeDataCache.write(metrics);
if (metrics.isEndOfBatch()) {
- sendToNext();
- }
- }
-
- private void sendToNext() {
- mergeDataCache.switchPointer();
- while (mergeDataCache.getLast().isWriting()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- }
- }
-
- mergeDataCache.getLast().collection().forEach(data -> {
- if (logger.isDebugEnabled()) {
- logger.debug(data.toString());
- }
-
- nextWorker.in(data);
- });
- mergeDataCache.finishReadingLast();
- }
-
- private void aggregate(Metrics metrics) {
- mergeDataCache.writing();
- if (mergeDataCache.containsKey(metrics)) {
- mergeDataCache.get(metrics).combine(metrics);
- } else {
- mergeDataCache.put(metrics);
+ mergeDataCache.read().forEach(
+ data -> {
+ if (log.isDebugEnabled()) {
+ log.debug(data.toString());
+ }
+ nextWorker.in(data);
+ }
+ );
}
-
- mergeDataCache.finishWriting();
}
private class AggregatorConsumer implements IConsumer<Metrics> {
@@ -153,7 +127,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
@Override
public void onError(List<Metrics> data, Throwable t) {
- logger.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 81538ef..cab7dd3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -32,7 +32,8 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -45,10 +46,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
* MetricsPersistentWorker is an extension of {@link PersistenceWorker} and focuses on the Metrics data persistent.
*/
@Slf4j
-public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDataCache<Metrics>> {
+public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
private final Model model;
private final Map<Metrics, Metrics> context;
- private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
@@ -60,11 +60,10 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate) {
- super(moduleDefineHolder);
+ super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model;
this.context = new HashMap<>(100);
this.enableDatabaseSession = enableDatabaseSession;
- this.mergeDataCache = new MergeDataCache<>();
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
@@ -112,11 +111,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
@Override
- public MergeDataCache<Metrics> getCache() {
- return mergeDataCache;
- }
-
- @Override
public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
long start = System.currentTimeMillis();
if (lastCollection.size() == 0) {
@@ -197,21 +191,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
nextExportWorker -> nextExportWorker.in(new ExportEvent(metrics, ExportEvent.EventType.TOTAL)));
}
- @Override
- public void cacheData(Metrics input) {
- mergeDataCache.writing();
- if (mergeDataCache.containsKey(input)) {
- Metrics metrics = mergeDataCache.get(input);
- metrics.combine(input);
- metrics.calculate();
- } else {
- input.calculate();
- mergeDataCache.put(input);
- }
-
- mergeDataCache.finishWriting();
- }
-
/**
* Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist.
*/
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 156618d..536a6d9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -20,28 +20,29 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Collection;
import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.data.SWCollection;
-import org.apache.skywalking.oap.server.core.analysis.data.Window;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* PersistenceWorker take the responsibility to pushing data to the final storage. The target storage is based on the
* activate storage implementation. This worker controls the persistence flow.
*
* @param <INPUT> The type of worker input. All inputs will be merged and saved.
- * @param <CACHE> Cache type to hold all input.
*/
-public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> {
+@Slf4j
+public abstract class PersistenceWorker<INPUT extends StorageData> extends AbstractWorker<INPUT> {
+ @Getter(AccessLevel.PROTECTED)
+ private final ReadWriteSafeCache<INPUT> cache;
- private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
-
- PersistenceWorker(ModuleDefineHolder moduleDefineHolder) {
+ PersistenceWorker(ModuleDefineHolder moduleDefineHolder, ReadWriteSafeCache<INPUT> cache) {
super(moduleDefineHolder);
+ this.cache = cache;
}
/**
@@ -54,9 +55,9 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
/**
* Cache data based on different strategies. See the implementations for more details.
*/
- public abstract void cacheData(INPUT input);
-
- public abstract CACHE getCache();
+ public void cacheData(INPUT input) {
+ cache.write(input);
+ }
/**
* The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}.
@@ -67,24 +68,6 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
public abstract void endOfRound(long tookTime);
/**
- * For every cache implementation(see {@link Window}), there are two dataset, switch them when one persistence round
- * is beginning, in order to make cached data immutable.
- *
- * @return true if switch successfully.
- */
- public boolean flushAndSwitch() {
- boolean isSwitch;
- try {
- if (isSwitch = getCache().trySwitchPointer()) {
- getCache().switchPointer();
- }
- } finally {
- getCache().trySwitchPointerFinally();
- }
- return isSwitch;
- }
-
- /**
* Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage
* implementations.
*
@@ -93,22 +76,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
*/
public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests);
- public final void buildBatchRequests(List<PrepareRequest> prepareRequests) {
- try {
- SWCollection<INPUT> last = getCache().getLast();
- while (last.isWriting()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- logger.warn("thread wake up");
- }
- }
-
- if (last.collection() != null) {
- prepareBatch(last.collection(), prepareRequests);
- }
- } finally {
- getCache().finishReadingLast();
- }
+ public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
+ final List<INPUT> dataList = getCache().read();
+ prepareBatch(dataList, prepareRequests);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 2b42620..3c150c7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -22,7 +22,8 @@ import java.util.Collection;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -34,59 +35,43 @@ import org.slf4j.LoggerFactory;
/**
* Top N worker is a persistence worker. Cache and order the data, flush in longer period.
*/
-public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
+public class TopNWorker extends PersistenceWorker<TopN> {
private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
- private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
private final IRecordDAO recordDAO;
private final Model model;
private final DataCarrier<TopN> dataCarrier;
- private long reportCycle;
+ private long reportPeriod;
private volatile long lastReportTimestamp;
- TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, int topNSize, long reportCycle,
- IRecordDAO recordDAO) {
- super(moduleDefineHolder);
- this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
+ TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, int topNSize, long reportPeriod,
+ IRecordDAO recordDAO) {
+ super(
+ moduleDefineHolder,
+ new ReadWriteSafeCache<>(new LimitedSizeBufferedData<>(topNSize), new LimitedSizeBufferedData<>(topNSize))
+ );
this.recordDAO = recordDAO;
this.model = model;
this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
this.lastReportTimestamp = System.currentTimeMillis();
// Top N persistent works per 10 minutes default.
- this.reportCycle = reportCycle;
- }
-
- @Override
- public void cacheData(TopN data) {
- limitedSizeDataCache.writing();
- try {
- limitedSizeDataCache.add(data);
- } finally {
- limitedSizeDataCache.finishWriting();
- }
- }
-
- @Override
- public LimitedSizeDataCache<TopN> getCache() {
- return limitedSizeDataCache;
+ this.reportPeriod = reportPeriod;
}
/**
- * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
- * time windows.
- * <p>
- * Switch and persistent attempt happens based on reportCycle.
+ * Force overriding the parent buildBatchRequests. Use its own report period.
*/
@Override
- public boolean flushAndSwitch() {
+ public void buildBatchRequests(final List<PrepareRequest> prepareRequests) {
long now = System.currentTimeMillis();
- if (now - lastReportTimestamp <= reportCycle) {
- return false;
+ if (now - lastReportTimestamp <= reportPeriod) {
+ // Only do report in its own report period.
+ return;
}
lastReportTimestamp = now;
- return super.flushAndSwitch();
+ super.buildBatchRequests(prepareRequests);
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
index 6b761e7..2c5b0b0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
@@ -18,25 +18,20 @@
package org.apache.skywalking.oap.server.core.remote.data;
-import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
import org.apache.skywalking.oap.server.core.remote.Deserializable;
import org.apache.skywalking.oap.server.core.remote.Serializable;
-public abstract class StreamData implements QueueData, Serializable, Deserializable {
-
+public abstract class StreamData implements Serializable, Deserializable {
private boolean endOfBatch = false;
- @Override
public void resetEndOfBatch() {
this.endOfBatch = false;
}
- @Override
public void asEndOfBatch() {
this.endOfBatch = true;
}
- @Override
public boolean isEndOfBatch() {
return this.endOfBatch;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 35730bd..7fada1b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -62,14 +62,26 @@ public enum PersistenceTimer {
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
- errorCounter = metricsCreator.createCounter("persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
- prepareLatency = metricsCreator.createHistogramMetric("persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
- executeLatency = metricsCreator.createHistogramMetric("persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ errorCounter = metricsCreator.createCounter(
+ "persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+ );
+ prepareLatency = metricsCreator.createHistogramMetric(
+ "persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+ );
+ executeLatency = metricsCreator.createHistogramMetric(
+ "persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+ );
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor()
- .scheduleWithFixedDelay(new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
- .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+ .scheduleWithFixedDelay(
+ new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
+ .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
+ TimeUnit.SECONDS
+ );
this.isStarted = true;
}
@@ -95,9 +107,7 @@ public enum PersistenceTimer {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
- if (worker.flushAndSwitch()) {
- worker.buildBatchRequests(prepareRequests);
- }
+ worker.buildBatchRequests(prepareRequests);
worker.endOfRound(System.currentTimeMillis() - lastTime);
});
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
similarity index 79%
rename from oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
rename to oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
index 4f506ab..9a6e217 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
@@ -23,18 +23,18 @@ import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
import org.junit.Assert;
import org.junit.Test;
-public class LimitedSizeDataCollectionTest {
+public class LimitedSizeBufferedDataTest {
@Test
public void testPut() {
- LimitedSizeDataCollection<MockStorageData> collection = new LimitedSizeDataCollection<>(5);
- collection.put(new MockStorageData(1));
- collection.put(new MockStorageData(3));
- collection.put(new MockStorageData(5));
- collection.put(new MockStorageData(7));
- collection.put(new MockStorageData(9));
+ LimitedSizeBufferedData<MockStorageData> collection = new LimitedSizeBufferedData<>(5);
+ collection.accept(new MockStorageData(1));
+ collection.accept(new MockStorageData(3));
+ collection.accept(new MockStorageData(5));
+ collection.accept(new MockStorageData(7));
+ collection.accept(new MockStorageData(9));
MockStorageData income = new MockStorageData(4);
- collection.put(income);
+ collection.accept(income);
int[] expected = new int[] {
3,
@@ -44,7 +44,7 @@ public class LimitedSizeDataCollectionTest {
9
};
int i = 0;
- for (MockStorageData data : collection.collection()) {
+ for (MockStorageData data : collection.read()) {
Assert.assertEquals(expected[i++], data.latency);
}
}
@@ -64,7 +64,7 @@ public class LimitedSizeDataCollectionTest {
@Override
public String id() {
- return null;
+ return "id";
}
@Override