You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/06/08 14:23:15 UTC

[skywalking] branch exporter created (now 4c66773)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch exporter
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 4c66773  Make exporter interface better.

This branch includes the following new commits:

     new 4c66773  Make exporter interface better.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Make exporter interface better.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch exporter
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 4c6677369255a33795fc612cac6d120bfc902e64
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Jun 8 22:23:03 2019 +0800

    Make exporter interface better.
---
 .../exporter/provider/grpc/GRPCExporter.java       | 14 +++++++---
 .../exporter/provider/grpc/GRPCExporterTest.java   | 12 ++++++--
 .../server/core/analysis/worker/ExportWorker.java  | 14 ++++------
 .../analysis/worker/MetricsPersistentWorker.java   | 13 +++++++--
 ...icValuesExportService.java => ExportEvent.java} | 32 ++++++++++++++++++----
 .../core/exporter/MetricValuesExportService.java   |  8 ++++--
 6 files changed, 69 insertions(+), 24 deletions(-)

diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index 4fdd05c..895dfd4 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -26,7 +26,7 @@ import lombok.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.metrics.*;
-import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
+import org.apache.skywalking.oap.server.core.exporter.*;
 import org.apache.skywalking.oap.server.exporter.grpc.*;
 import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
 import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
@@ -56,9 +56,15 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
         subscriptionSet = new HashSet<>();
     }
 
-    @Override public void export(MetricsMetaInfo meta, Metrics metrics) {
-        if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
-            exportBuffer.produce(new ExportData(meta, metrics));
+    @Override public void export(ExportEvent event) {
+        if (ExportEvent.EventType.TOTAL.equals(event.getType())) {
+            Metrics metrics = event.getMetrics();
+            if (metrics instanceof WithMetadata) {
+                MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
+                if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
+                    exportBuffer.produce(new ExportData(meta, metrics));
+                }
+            }
         }
     }
 
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index 0b347d4..5c0d50d 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -19,7 +19,8 @@
 package org.apache.skywalking.oap.server.exporter.provider.grpc;
 
 import io.grpc.testing.GrpcServerRule;
-import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
+import org.apache.skywalking.oap.server.core.analysis.metrics.*;
+import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
 import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
 import org.junit.Before;
@@ -58,7 +59,14 @@ public class GRPCExporterTest {
 
     @Test
     public void export() {
-        exporter.export(metaInfo, new MockMetrics());
+        ExportEvent event = new ExportEvent(new MockExporterMetrics(), ExportEvent.EventType.TOTAL);
+        exporter.export(event);
+    }
+
+    public static class MockExporterMetrics extends MockMetrics implements WithMetadata {
+        @Override public MetricsMetaInfo getMeta() {
+            return new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL);
+        }
     }
 
     @Test
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
index 6f1edc0..09fa143 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import org.apache.skywalking.oap.server.core.analysis.metrics.*;
 import org.apache.skywalking.oap.server.core.exporter.*;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -26,21 +25,20 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 /**
  * @author wusheng
  */
-public class ExportWorker extends AbstractWorker<Metrics> {
+public class ExportWorker extends AbstractWorker<ExportEvent> {
     private MetricValuesExportService exportService;
 
     public ExportWorker(ModuleDefineHolder moduleDefineHolder) {
         super(moduleDefineHolder);
     }
 
-    @Override public void in(Metrics metrics) {
+    @Override public void in(ExportEvent event) {
         if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
-            if (metrics instanceof WithMetadata) {
-                if (exportService == null) {
-                    exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
-                }
-                exportService.export(((WithMetadata)metrics).getMeta(), metrics);
+            if (exportService == null) {
+                exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
             }
+            exportService.export(event);
         }
     }
+
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 513f3ae..b4b4fd1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -43,12 +44,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
     private final MergeDataCache<Metrics> mergeDataCache;
     private final IMetricsDAO metricsDAO;
     private final AbstractWorker<Metrics> nextAlarmWorker;
-    private final AbstractWorker<Metrics> nextExportWorker;
+    private final AbstractWorker<ExportEvent> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
 
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize,
         IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
-        AbstractWorker<Metrics> nextExportWorker) {
+        AbstractWorker<ExportEvent> nextExportWorker) {
         super(moduleDefineHolder, batchSize);
         this.model = model;
         this.mergeDataCache = new MergeDataCache<>();
@@ -100,6 +101,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
     @Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
         List<Object> batchCollection = new LinkedList<>();
         cache.getLast().collection().forEach(data -> {
+            if (Objects.nonNull(nextExportWorker)) {
+                ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
+                nextExportWorker.in(event);
+            }
+
             Metrics dbData = null;
             try {
                 dbData = metricsDAO.get(model, data);
@@ -120,7 +126,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
                     nextAlarmWorker.in(data);
                 }
                 if (Objects.nonNull(nextExportWorker)) {
-                    nextExportWorker.in(data);
+                    ExportEvent event = new ExportEvent(data, ExportEvent.EventType.TOTAL);
+                    nextExportWorker.in(event);
                 }
             } catch (Throwable t) {
                 logger.error(t.getMessage(), t);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java
similarity index 53%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java
index 8e66f88..cce4e0c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java
@@ -18,14 +18,36 @@
 
 package org.apache.skywalking.oap.server.core.exporter;
 
-import org.apache.skywalking.oap.server.core.analysis.metrics.*;
-import org.apache.skywalking.oap.server.library.module.Service;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 
 /**
- * Export the metrics value from metrics through this service, if provider exists.
+ * The event for exporter {@link MetricValuesExportService} implementation processes.
+ * {@link #metrics} should not be changed in any case.
  *
  * @author wusheng
  */
-public interface MetricValuesExportService extends Service {
-    void export(MetricsMetaInfo meta, Metrics metrics);
+@Getter
+public class ExportEvent {
+    /**
+     * Fields of this should not be changed in any case.
+     */
+    private Metrics metrics;
+    private EventType type;
+
+    public ExportEvent(Metrics metrics, EventType type) {
+        this.metrics = metrics;
+        this.type = type;
+    }
+
+    public enum EventType {
+        /**
+         * The metrics aggregated in this bulk, not include the existing persistent data.
+         */
+        INCREMENT,
+        /**
+         * Final result of the metrics at this moment.
+         */
+        TOTAL
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
index 8e66f88..18779f3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.oap.server.core.exporter;
 
-import org.apache.skywalking.oap.server.core.analysis.metrics.*;
 import org.apache.skywalking.oap.server.library.module.Service;
 
 /**
@@ -27,5 +26,10 @@ import org.apache.skywalking.oap.server.library.module.Service;
  * @author wusheng
  */
 public interface MetricValuesExportService extends Service {
-    void export(MetricsMetaInfo meta, Metrics metrics);
+    /**
+     * This method is sync-mode export, the performance effects the persistence result. Queue mode is high recommended.
+     *
+     * @param event value is only accurate when the method invokes. Don't cache it.
+     */
+    void export(ExportEvent event);
 }