You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/29 15:33:26 UTC

[skywalking] 01/01: Optimize aggregation in L1 and L2.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch aggregate-optimization
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 771871c2e8baef6afd526e725bae82b3c730dbf9
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Apr 29 23:32:23 2020 +0800

    Optimize aggregation in L1 and L2.
---
 .../core/analysis/data/ReadWriteSafeCache.java     | 19 +++++++
 .../analysis/worker/MetricsAggregateWorker.java    | 64 +++++++++-------------
 .../analysis/worker/MetricsPersistentWorker.java   | 16 +-----
 .../core/analysis/worker/PersistenceWorker.java    |  9 +--
 .../server/core/analysis/worker/TopNWorker.java    | 19 ++-----
 .../oap/server/core/remote/data/StreamData.java    | 14 -----
 .../oap/server/core/storage/PersistenceTimer.java  | 29 +++++-----
 7 files changed, 66 insertions(+), 104 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
index 33a4e01..e73bbfb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
@@ -50,6 +50,11 @@ public class ReadWriteSafeCache<T> {
         lock = new ReentrantLock();
     }
 
+    /**
+     * Write the into the {@link #writeBufferPointer} buffer.
+     *
+     * @param data to enqueue.
+     */
     public void write(T data) {
         lock.lock();
         try {
@@ -59,6 +64,20 @@ public class ReadWriteSafeCache<T> {
         }
     }
 
+    /**
+     * Write the collection of data into the {@link #writeBufferPointer} buffer.
+     *
+     * @param data to enqueue.
+     */
+    public void write(List<T> data) {
+        lock.lock();
+        try {
+            data.forEach(writeBufferPointer::accept);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public List<T> read() {
         lock.lock();
         try {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 3d5105f..04f6601 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.Iterator;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -27,7 +26,6 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactor
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
-import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -46,14 +44,14 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
     private AbstractWorker<Metrics> nextWorker;
     private final DataCarrier<Metrics> dataCarrier;
-    private final ReadWriteSafeCache<Metrics> mergeDataCache;
+    private final MergableBufferedData<Metrics> mergeDataCache;
     private CounterMetrics aggregationCounter;
 
     MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
                            String modelName) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
-        this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData());
+        this.mergeDataCache = new MergableBufferedData();
         String name = "METRICS_L1_AGGREGATION";
         this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
 
@@ -64,7 +62,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
         } catch (Exception e) {
             throw new UnexpectedException(e.getMessage(), e);
         }
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
 
         MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
                                                           .provider()
@@ -75,54 +73,44 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
         );
     }
 
+    /**
+     * MetricsAggregateWorker#in operation does include enqueue only
+     */
     @Override
     public final void in(Metrics metrics) {
-        metrics.resetEndOfBatch();
         dataCarrier.produce(metrics);
     }
 
-    private void onWork(Metrics metrics) {
-        aggregationCounter.inc();
-        mergeDataCache.write(metrics);
-
-        if (metrics.isEndOfBatch()) {
-            mergeDataCache.read().forEach(
-                data -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug(data.toString());
-                    }
-                    nextWorker.in(data);
+    /**
+     * Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
+     * instance.
+     *
+     * @param metricsList from the queue.
+     */
+    private void onWork(List<Metrics> metricsList) {
+        metricsList.forEach(metrics -> {
+            aggregationCounter.inc();
+            mergeDataCache.accept(metrics);
+        });
+
+        mergeDataCache.read().forEach(
+            data -> {
+                if (log.isDebugEnabled()) {
+                    log.debug(data.toString());
                 }
-            );
-        }
+                nextWorker.in(data);
+            }
+        );
     }
 
     private class AggregatorConsumer implements IConsumer<Metrics> {
-
-        private final MetricsAggregateWorker aggregator;
-
-        private AggregatorConsumer(MetricsAggregateWorker aggregator) {
-            this.aggregator = aggregator;
-        }
-
         @Override
         public void init() {
-
         }
 
         @Override
         public void consume(List<Metrics> data) {
-            Iterator<Metrics> inputIterator = data.iterator();
-
-            int i = 0;
-            while (inputIterator.hasNext()) {
-                Metrics metrics = inputIterator.next();
-                i++;
-                if (i == data.size()) {
-                    metrics.asEndOfBatch();
-                }
-                aggregator.onWork(metrics);
-            }
+            MetricsAggregateWorker.this.onWork(data);
         }
 
         @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index cab7dd3..a3cf58d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -83,7 +83,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         }
 
         this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
     }
 
     /**
@@ -97,11 +97,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         );
     }
 
-    @Override
-    void onWork(Metrics metrics) {
-        cacheData(metrics);
-    }
-
     /**
      * Accept all metrics data and push them into the queue for serial processing
      */
@@ -235,13 +230,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
      * ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual.
      */
     private class PersistentConsumer implements IConsumer<Metrics> {
-
-        private final MetricsPersistentWorker persistent;
-
-        private PersistentConsumer(MetricsPersistentWorker persistent) {
-            this.persistent = persistent;
-        }
-
         @Override
         public void init() {
 
@@ -249,7 +237,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
 
         @Override
         public void consume(List<Metrics> data) {
-            data.forEach(persistent::onWork);
+            MetricsPersistentWorker.this.onWork(data);
         }
 
         @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 536a6d9..415371b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -48,14 +48,7 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr
     /**
      * Accept the input, and push the data into the cache.
      */
-    void onWork(INPUT input) {
-        cacheData(input);
-    }
-
-    /**
-     * Cache data based on different strategies. See the implementations for more details.
-     */
-    public void cacheData(INPUT input) {
+    void onWork(List<INPUT> input) {
         cache.write(input);
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 3c150c7..9093a2f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.Collection;
 import java.util.List;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
@@ -29,16 +30,12 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Top N worker is a persistence worker. Cache and order the data, flush in longer period.
  */
+@Slf4j
 public class TopNWorker extends PersistenceWorker<TopN> {
-
-    private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
-
     private final IRecordDAO recordDAO;
     private final Model model;
     private final DataCarrier<TopN> dataCarrier;
@@ -80,7 +77,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
             try {
                 prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
             } catch (Throwable t) {
-                logger.error(t.getMessage(), t);
+                log.error(t.getMessage(), t);
             }
         });
     }
@@ -98,24 +95,18 @@ public class TopNWorker extends PersistenceWorker<TopN> {
     }
 
     private class TopNConsumer implements IConsumer<TopN> {
-
         @Override
         public void init() {
-
         }
 
         @Override
         public void consume(List<TopN> data) {
-            /*
-             * TopN is not following the batch size trigger mode.
-             * No need to implement this method, the memory size is limited always.
-             */
-            data.forEach(TopNWorker.this::onWork);
+            TopNWorker.this.onWork(data);
         }
 
         @Override
         public void onError(List<TopN> data, Throwable t) {
-            logger.error(t.getMessage(), t);
+            log.error(t.getMessage(), t);
         }
 
         @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
index 2c5b0b0..3c1c902 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
@@ -22,19 +22,5 @@ import org.apache.skywalking.oap.server.core.remote.Deserializable;
 import org.apache.skywalking.oap.server.core.remote.Serializable;
 
 public abstract class StreamData implements Serializable, Deserializable {
-    private boolean endOfBatch = false;
-
-    public void resetEndOfBatch() {
-        this.endOfBatch = false;
-    }
-
-    public void asEndOfBatch() {
-        this.endOfBatch = true;
-    }
-
-    public boolean isEndOfBatch() {
-        return this.endOfBatch;
-    }
-
     public abstract int remoteHashCode();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 7fada1b..f35459c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
@@ -35,14 +36,10 @@ import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public enum PersistenceTimer {
     INSTANCE;
-
-    private static final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
-
     private Boolean isStarted = false;
     private final Boolean debug;
     private CounterMetrics errorCounter;
@@ -56,7 +53,7 @@ public enum PersistenceTimer {
     }
 
     public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
-        logger.info("persistence timer start");
+        log.info("persistence timer start");
         IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
 
         MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
@@ -78,7 +75,7 @@ public enum PersistenceTimer {
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor()
                      .scheduleWithFixedDelay(
-                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
+                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
                              .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
                          TimeUnit.SECONDS
                      );
@@ -88,8 +85,8 @@ public enum PersistenceTimer {
     }
 
     private void extractDataAndSave(IBatchDAO batchDAO) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Extract data and save");
+        if (log.isDebugEnabled()) {
+            log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
@@ -103,8 +100,8 @@ public enum PersistenceTimer {
                 persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
 
                 persistenceWorkers.forEach(worker -> {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("extract {} worker data and save", worker.getClass().getName());
+                    if (log.isDebugEnabled()) {
+                        log.debug("extract {} worker data and save", worker.getClass().getName());
                     }
 
                     worker.buildBatchRequests(prepareRequests);
@@ -113,7 +110,7 @@ public enum PersistenceTimer {
                 });
 
                 if (debug) {
-                    logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+                    log.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
                 }
             } finally {
                 timer.finish();
@@ -129,10 +126,10 @@ public enum PersistenceTimer {
             }
         } catch (Throwable e) {
             errorCounter.inc();
-            logger.error(e.getMessage(), e);
+            log.error(e.getMessage(), e);
         } finally {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Persistence data save finish");
+            if (log.isDebugEnabled()) {
+                log.debug("Persistence data save finish");
             }
 
             prepareRequests.clear();
@@ -140,7 +137,7 @@ public enum PersistenceTimer {
         }
 
         if (debug) {
-            logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+            log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
         }
     }
 }