You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/29 15:33:25 UTC

[skywalking] branch aggregate-optimization created (now 771871c)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch aggregate-optimization
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 771871c  Optimize aggregation in L1 and L2.

This branch includes the following new commits:

     new 771871c  Optimize aggregation in L1 and L2.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Optimize aggregation in L1 and L2.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch aggregate-optimization
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 771871c2e8baef6afd526e725bae82b3c730dbf9
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Apr 29 23:32:23 2020 +0800

    Optimize aggregation in L1 and L2.
---
 .../core/analysis/data/ReadWriteSafeCache.java     | 19 +++++++
 .../analysis/worker/MetricsAggregateWorker.java    | 64 +++++++++-------------
 .../analysis/worker/MetricsPersistentWorker.java   | 16 +-----
 .../core/analysis/worker/PersistenceWorker.java    |  9 +--
 .../server/core/analysis/worker/TopNWorker.java    | 19 ++-----
 .../oap/server/core/remote/data/StreamData.java    | 14 -----
 .../oap/server/core/storage/PersistenceTimer.java  | 29 +++++-----
 7 files changed, 66 insertions(+), 104 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
index 33a4e01..e73bbfb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java
@@ -50,6 +50,11 @@ public class ReadWriteSafeCache<T> {
         lock = new ReentrantLock();
     }
 
+    /**
+     * Write the into the {@link #writeBufferPointer} buffer.
+     *
+     * @param data to enqueue.
+     */
     public void write(T data) {
         lock.lock();
         try {
@@ -59,6 +64,20 @@ public class ReadWriteSafeCache<T> {
         }
     }
 
+    /**
+     * Write the collection of data into the {@link #writeBufferPointer} buffer.
+     *
+     * @param data to enqueue.
+     */
+    public void write(List<T> data) {
+        lock.lock();
+        try {
+            data.forEach(writeBufferPointer::accept);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public List<T> read() {
         lock.lock();
         try {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 3d5105f..04f6601 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.Iterator;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -27,7 +26,6 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactor
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
-import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -46,14 +44,14 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
     private AbstractWorker<Metrics> nextWorker;
     private final DataCarrier<Metrics> dataCarrier;
-    private final ReadWriteSafeCache<Metrics> mergeDataCache;
+    private final MergableBufferedData<Metrics> mergeDataCache;
     private CounterMetrics aggregationCounter;
 
     MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
                            String modelName) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
-        this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData());
+        this.mergeDataCache = new MergableBufferedData();
         String name = "METRICS_L1_AGGREGATION";
         this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
 
@@ -64,7 +62,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
         } catch (Exception e) {
             throw new UnexpectedException(e.getMessage(), e);
         }
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
 
         MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
                                                           .provider()
@@ -75,54 +73,44 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
         );
     }
 
+    /**
+     * MetricsAggregateWorker#in operation does include enqueue only
+     */
     @Override
     public final void in(Metrics metrics) {
-        metrics.resetEndOfBatch();
         dataCarrier.produce(metrics);
     }
 
-    private void onWork(Metrics metrics) {
-        aggregationCounter.inc();
-        mergeDataCache.write(metrics);
-
-        if (metrics.isEndOfBatch()) {
-            mergeDataCache.read().forEach(
-                data -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug(data.toString());
-                    }
-                    nextWorker.in(data);
+    /**
+     * Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
+     * instance.
+     *
+     * @param metricsList from the queue.
+     */
+    private void onWork(List<Metrics> metricsList) {
+        metricsList.forEach(metrics -> {
+            aggregationCounter.inc();
+            mergeDataCache.accept(metrics);
+        });
+
+        mergeDataCache.read().forEach(
+            data -> {
+                if (log.isDebugEnabled()) {
+                    log.debug(data.toString());
                 }
-            );
-        }
+                nextWorker.in(data);
+            }
+        );
     }
 
     private class AggregatorConsumer implements IConsumer<Metrics> {
-
-        private final MetricsAggregateWorker aggregator;
-
-        private AggregatorConsumer(MetricsAggregateWorker aggregator) {
-            this.aggregator = aggregator;
-        }
-
         @Override
         public void init() {
-
         }
 
         @Override
         public void consume(List<Metrics> data) {
-            Iterator<Metrics> inputIterator = data.iterator();
-
-            int i = 0;
-            while (inputIterator.hasNext()) {
-                Metrics metrics = inputIterator.next();
-                i++;
-                if (i == data.size()) {
-                    metrics.asEndOfBatch();
-                }
-                aggregator.onWork(metrics);
-            }
+            MetricsAggregateWorker.this.onWork(data);
         }
 
         @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index cab7dd3..a3cf58d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -83,7 +83,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         }
 
         this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
     }
 
     /**
@@ -97,11 +97,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
         );
     }
 
-    @Override
-    void onWork(Metrics metrics) {
-        cacheData(metrics);
-    }
-
     /**
      * Accept all metrics data and push them into the queue for serial processing
      */
@@ -235,13 +230,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
      * ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual.
      */
     private class PersistentConsumer implements IConsumer<Metrics> {
-
-        private final MetricsPersistentWorker persistent;
-
-        private PersistentConsumer(MetricsPersistentWorker persistent) {
-            this.persistent = persistent;
-        }
-
         @Override
         public void init() {
 
@@ -249,7 +237,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
 
         @Override
         public void consume(List<Metrics> data) {
-            data.forEach(persistent::onWork);
+            MetricsPersistentWorker.this.onWork(data);
         }
 
         @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 536a6d9..415371b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -48,14 +48,7 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr
     /**
      * Accept the input, and push the data into the cache.
      */
-    void onWork(INPUT input) {
-        cacheData(input);
-    }
-
-    /**
-     * Cache data based on different strategies. See the implementations for more details.
-     */
-    public void cacheData(INPUT input) {
+    void onWork(List<INPUT> input) {
         cache.write(input);
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 3c150c7..9093a2f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.Collection;
 import java.util.List;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
@@ -29,16 +30,12 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Top N worker is a persistence worker. Cache and order the data, flush in longer period.
  */
+@Slf4j
 public class TopNWorker extends PersistenceWorker<TopN> {
-
-    private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
-
     private final IRecordDAO recordDAO;
     private final Model model;
     private final DataCarrier<TopN> dataCarrier;
@@ -80,7 +77,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
             try {
                 prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
             } catch (Throwable t) {
-                logger.error(t.getMessage(), t);
+                log.error(t.getMessage(), t);
             }
         });
     }
@@ -98,24 +95,18 @@ public class TopNWorker extends PersistenceWorker<TopN> {
     }
 
     private class TopNConsumer implements IConsumer<TopN> {
-
         @Override
         public void init() {
-
         }
 
         @Override
         public void consume(List<TopN> data) {
-            /*
-             * TopN is not following the batch size trigger mode.
-             * No need to implement this method, the memory size is limited always.
-             */
-            data.forEach(TopNWorker.this::onWork);
+            TopNWorker.this.onWork(data);
         }
 
         @Override
         public void onError(List<TopN> data, Throwable t) {
-            logger.error(t.getMessage(), t);
+            log.error(t.getMessage(), t);
         }
 
         @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
index 2c5b0b0..3c1c902 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
@@ -22,19 +22,5 @@ import org.apache.skywalking.oap.server.core.remote.Deserializable;
 import org.apache.skywalking.oap.server.core.remote.Serializable;
 
 public abstract class StreamData implements Serializable, Deserializable {
-    private boolean endOfBatch = false;
-
-    public void resetEndOfBatch() {
-        this.endOfBatch = false;
-    }
-
-    public void asEndOfBatch() {
-        this.endOfBatch = true;
-    }
-
-    public boolean isEndOfBatch() {
-        return this.endOfBatch;
-    }
-
     public abstract int remoteHashCode();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 7fada1b..f35459c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
@@ -35,14 +36,10 @@ import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public enum PersistenceTimer {
     INSTANCE;
-
-    private static final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
-
     private Boolean isStarted = false;
     private final Boolean debug;
     private CounterMetrics errorCounter;
@@ -56,7 +53,7 @@ public enum PersistenceTimer {
     }
 
     public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
-        logger.info("persistence timer start");
+        log.info("persistence timer start");
         IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
 
         MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
@@ -78,7 +75,7 @@ public enum PersistenceTimer {
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor()
                      .scheduleWithFixedDelay(
-                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
+                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
                              .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
                          TimeUnit.SECONDS
                      );
@@ -88,8 +85,8 @@ public enum PersistenceTimer {
     }
 
     private void extractDataAndSave(IBatchDAO batchDAO) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Extract data and save");
+        if (log.isDebugEnabled()) {
+            log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
@@ -103,8 +100,8 @@ public enum PersistenceTimer {
                 persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
 
                 persistenceWorkers.forEach(worker -> {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("extract {} worker data and save", worker.getClass().getName());
+                    if (log.isDebugEnabled()) {
+                        log.debug("extract {} worker data and save", worker.getClass().getName());
                     }
 
                     worker.buildBatchRequests(prepareRequests);
@@ -113,7 +110,7 @@ public enum PersistenceTimer {
                 });
 
                 if (debug) {
-                    logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+                    log.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
                 }
             } finally {
                 timer.finish();
@@ -129,10 +126,10 @@ public enum PersistenceTimer {
             }
         } catch (Throwable e) {
             errorCounter.inc();
-            logger.error(e.getMessage(), e);
+            log.error(e.getMessage(), e);
         } finally {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Persistence data save finish");
+            if (log.isDebugEnabled()) {
+                log.debug("Persistence data save finish");
             }
 
             prepareRequests.clear();
@@ -140,7 +137,7 @@ public enum PersistenceTimer {
         }
 
         if (debug) {
-            logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+            log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
         }
     }
 }