You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/29 15:33:26 UTC
[skywalking] 01/01: Optimize aggregation in L1 and L2.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch aggregate-optimization
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 771871c2e8baef6afd526e725bae82b3c730dbf9
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Apr 29 23:32:23 2020 +0800
Optimize aggregation in L1 and L2.
---
.../core/analysis/data/ReadWriteSafeCache.java | 19 +++++++
.../analysis/worker/MetricsAggregateWorker.java | 64 +++++++++-------------
.../analysis/worker/MetricsPersistentWorker.java | 16 +-----
.../core/analysis/worker/PersistenceWorker.java | 9 +--
.../server/core/analysis/worker/TopNWorker.java | 19 ++-----
.../oap/server/core/remote/data/StreamData.java | 14 -----
.../oap/server/core/storage/PersistenceTimer.java | 29 +++++-----
7 files changed, 66 insertions(+), 104 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
index 33a4e01..e73bbfb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
@@ -50,6 +50,11 @@ public class ReadWriteSafeCache<T> {
lock = new ReentrantLock();
}
+ /**
+ * Write the into the {@link #writeBufferPointer} buffer.
+ *
+ * @param data to enqueue.
+ */
public void write(T data) {
lock.lock();
try {
@@ -59,6 +64,20 @@ public class ReadWriteSafeCache<T> {
}
}
+ /**
+ * Write the collection of data into the {@link #writeBufferPointer} buffer.
+ *
+ * @param data to enqueue.
+ */
+ public void write(List<T> data) {
+ lock.lock();
+ try {
+ data.forEach(writeBufferPointer::accept);
+ } finally {
+ lock.unlock();
+ }
+ }
+
public List<T> read() {
lock.lock();
try {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 3d5105f..04f6601 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.Iterator;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -27,7 +26,6 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactor
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
-import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -46,14 +44,14 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
- private final ReadWriteSafeCache<Metrics> mergeDataCache;
+ private final MergableBufferedData<Metrics> mergeDataCache;
private CounterMetrics aggregationCounter;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
String modelName) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
- this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData());
+ this.mergeDataCache = new MergableBufferedData();
String name = "METRICS_L1_AGGREGATION";
this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
@@ -64,7 +62,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}
- this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
@@ -75,54 +73,44 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
);
}
+ /**
+ * MetricsAggregateWorker#in operation does include enqueue only
+ */
@Override
public final void in(Metrics metrics) {
- metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
- private void onWork(Metrics metrics) {
- aggregationCounter.inc();
- mergeDataCache.write(metrics);
-
- if (metrics.isEndOfBatch()) {
- mergeDataCache.read().forEach(
- data -> {
- if (log.isDebugEnabled()) {
- log.debug(data.toString());
- }
- nextWorker.in(data);
+ /**
+ * Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
+ * instance.
+ *
+ * @param metricsList from the queue.
+ */
+ private void onWork(List<Metrics> metricsList) {
+ metricsList.forEach(metrics -> {
+ aggregationCounter.inc();
+ mergeDataCache.accept(metrics);
+ });
+
+ mergeDataCache.read().forEach(
+ data -> {
+ if (log.isDebugEnabled()) {
+ log.debug(data.toString());
}
- );
- }
+ nextWorker.in(data);
+ }
+ );
}
private class AggregatorConsumer implements IConsumer<Metrics> {
-
- private final MetricsAggregateWorker aggregator;
-
- private AggregatorConsumer(MetricsAggregateWorker aggregator) {
- this.aggregator = aggregator;
- }
-
@Override
public void init() {
-
}
@Override
public void consume(List<Metrics> data) {
- Iterator<Metrics> inputIterator = data.iterator();
-
- int i = 0;
- while (inputIterator.hasNext()) {
- Metrics metrics = inputIterator.next();
- i++;
- if (i == data.size()) {
- metrics.asEndOfBatch();
- }
- aggregator.onWork(metrics);
- }
+ MetricsAggregateWorker.this.onWork(data);
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index cab7dd3..a3cf58d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -83,7 +83,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
}
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000);
- this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
}
/**
@@ -97,11 +97,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
);
}
- @Override
- void onWork(Metrics metrics) {
- cacheData(metrics);
- }
-
/**
* Accept all metrics data and push them into the queue for serial processing
*/
@@ -235,13 +230,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
* ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual.
*/
private class PersistentConsumer implements IConsumer<Metrics> {
-
- private final MetricsPersistentWorker persistent;
-
- private PersistentConsumer(MetricsPersistentWorker persistent) {
- this.persistent = persistent;
- }
-
@Override
public void init() {
@@ -249,7 +237,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
@Override
public void consume(List<Metrics> data) {
- data.forEach(persistent::onWork);
+ MetricsPersistentWorker.this.onWork(data);
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 536a6d9..415371b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -48,14 +48,7 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr
/**
* Accept the input, and push the data into the cache.
*/
- void onWork(INPUT input) {
- cacheData(input);
- }
-
- /**
- * Cache data based on different strategies. See the implementations for more details.
- */
- public void cacheData(INPUT input) {
+ void onWork(List<INPUT> input) {
cache.write(input);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 3c150c7..9093a2f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Collection;
import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
@@ -29,16 +30,12 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Top N worker is a persistence worker. Cache and order the data, flush in longer period.
*/
+@Slf4j
public class TopNWorker extends PersistenceWorker<TopN> {
-
- private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
-
private final IRecordDAO recordDAO;
private final Model model;
private final DataCarrier<TopN> dataCarrier;
@@ -80,7 +77,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
try {
prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
} catch (Throwable t) {
- logger.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
});
}
@@ -98,24 +95,18 @@ public class TopNWorker extends PersistenceWorker<TopN> {
}
private class TopNConsumer implements IConsumer<TopN> {
-
@Override
public void init() {
-
}
@Override
public void consume(List<TopN> data) {
- /*
- * TopN is not following the batch size trigger mode.
- * No need to implement this method, the memory size is limited always.
- */
- data.forEach(TopNWorker.this::onWork);
+ TopNWorker.this.onWork(data);
}
@Override
public void onError(List<TopN> data, Throwable t) {
- logger.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
index 2c5b0b0..3c1c902 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
@@ -22,19 +22,5 @@ import org.apache.skywalking.oap.server.core.remote.Deserializable;
import org.apache.skywalking.oap.server.core.remote.Serializable;
public abstract class StreamData implements Serializable, Deserializable {
- private boolean endOfBatch = false;
-
- public void resetEndOfBatch() {
- this.endOfBatch = false;
- }
-
- public void asEndOfBatch() {
- this.endOfBatch = true;
- }
-
- public boolean isEndOfBatch() {
- return this.endOfBatch;
- }
-
public abstract int remoteHashCode();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 7fada1b..f35459c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
@@ -35,14 +36,10 @@ import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public enum PersistenceTimer {
INSTANCE;
-
- private static final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
-
private Boolean isStarted = false;
private final Boolean debug;
private CounterMetrics errorCounter;
@@ -56,7 +53,7 @@ public enum PersistenceTimer {
}
public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
- logger.info("persistence timer start");
+ log.info("persistence timer start");
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
@@ -78,7 +75,7 @@ public enum PersistenceTimer {
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(
- new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
+ new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
TimeUnit.SECONDS
);
@@ -88,8 +85,8 @@ public enum PersistenceTimer {
}
private void extractDataAndSave(IBatchDAO batchDAO) {
- if (logger.isDebugEnabled()) {
- logger.debug("Extract data and save");
+ if (log.isDebugEnabled()) {
+ log.debug("Extract data and save");
}
long startTime = System.currentTimeMillis();
@@ -103,8 +100,8 @@ public enum PersistenceTimer {
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
- if (logger.isDebugEnabled()) {
- logger.debug("extract {} worker data and save", worker.getClass().getName());
+ if (log.isDebugEnabled()) {
+ log.debug("extract {} worker data and save", worker.getClass().getName());
}
worker.buildBatchRequests(prepareRequests);
@@ -113,7 +110,7 @@ public enum PersistenceTimer {
});
if (debug) {
- logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+ log.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
} finally {
timer.finish();
@@ -129,10 +126,10 @@ public enum PersistenceTimer {
}
} catch (Throwable e) {
errorCounter.inc();
- logger.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
} finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Persistence data save finish");
+ if (log.isDebugEnabled()) {
+ log.debug("Persistence data save finish");
}
prepareRequests.clear();
@@ -140,7 +137,7 @@ public enum PersistenceTimer {
}
if (debug) {
- logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+ log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
}