You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/06/14 01:26:00 UTC

[skywalking] 01/01: Fix no stream register.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch oap-cluster-fix
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 06d831e3b3e1fa48c86c7e185c2d6e885ec5baf8
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Jun 14 09:25:51 2019 +0800

    Fix no stream register.
---
 .../oap/server/core/analysis/worker/MetricsStreamProcessor.java       | 4 ++++
 .../oap/server/core/register/worker/InventoryStreamProcessor.java     | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index e250506..20d09bf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
 import org.apache.skywalking.oap.server.core.storage.model.*;
@@ -66,6 +67,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
         IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
 
+        StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
+        streamDataMappingSetter.putIfAbsent(metricsClass);
+
         MetricsPersistentWorker hourPersistentWorker = null;
         MetricsPersistentWorker dayPersistentWorker = null;
         MetricsPersistentWorker monthPersistentWorker = null;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 0e4b3dd..596ab81 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -22,6 +22,7 @@ import java.util.*;
 import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
 import org.apache.skywalking.oap.server.core.storage.model.*;
@@ -57,6 +58,9 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
         IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
 
+        StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
+        streamDataMappingSetter.putIfAbsent(inventoryClass);
+
         RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
 
         RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);