You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/06/14 03:31:55 UTC

[skywalking] branch master updated: Fix no stream register. (#2873)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 0103d8a  Fix no stream register. (#2873)
0103d8a is described below

commit 0103d8ad2892f888cb55a01a7efd39a44a22726f
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Jun 14 11:31:51 2019 +0800

    Fix no stream register. (#2873)
    
    * Fix no stream register.
    
    * Make sure list order.
    
    * Refactor codes.
    
    * Fix wrong revert.
---
 .../oap/server/core/CoreModuleProvider.java        |  1 +
 .../analysis/worker/MetricsStreamProcessor.java    |  4 ++++
 .../register/worker/InventoryStreamProcessor.java  |  4 ++++
 .../core/remote/define/StreamDataMapping.java      | 27 ++++++++++++++++++----
 4 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9430020..01ddd9f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -160,6 +160,7 @@ public class CoreModuleProvider extends ModuleProvider {
 
             annotationScan.scan(() -> {
             });
+            streamDataMapping.init();
         } catch (IOException | IllegalAccessException | InstantiationException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index e250506..20d09bf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
 import org.apache.skywalking.oap.server.core.storage.model.*;
@@ -66,6 +67,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
         IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
 
+        StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
+        streamDataMappingSetter.putIfAbsent(metricsClass);
+
         MetricsPersistentWorker hourPersistentWorker = null;
         MetricsPersistentWorker dayPersistentWorker = null;
         MetricsPersistentWorker monthPersistentWorker = null;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 0e4b3dd..596ab81 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -22,6 +22,7 @@ import java.util.*;
 import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
 import org.apache.skywalking.oap.server.core.storage.model.*;
@@ -57,6 +58,9 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
         IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
 
+        StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
+        streamDataMappingSetter.putIfAbsent(inventoryClass);
+
         RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
 
         RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
index 31eb34a..00ef25b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
@@ -25,12 +25,12 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
  * @author peng-yongsheng
  */
 public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMappingSetter {
-
-    private int id = 0;
+    private List<Class<? extends StreamData>> streamClassList;
     private final Map<Class<? extends StreamData>, Integer> classMap;
     private final Map<Integer, Class<? extends StreamData>> idMap;
 
     public StreamDataMapping() {
+        streamClassList = new ArrayList<>();
         this.classMap = new HashMap<>();
         this.idMap = new HashMap<>();
     }
@@ -40,9 +40,26 @@ public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMap
             return;
         }
 
-        id++;
-        classMap.put(streamDataClass, id);
-        idMap.put(id, streamDataClass);
+        streamClassList.add(streamDataClass);
+    }
+
+    public void init() {
+        /**
+         * The stream protocol use this list order to assign the ID,
+         * which is used in across node communication. This order must be certain.
+         */
+        Collections.sort(streamClassList, new Comparator<Class>() {
+            @Override public int compare(Class streamClass1, Class streamClass2) {
+                return streamClass1.getName().compareTo(streamClass2.getName());
+            }
+        });
+
+        for (int i = 0; i < streamClassList.size(); i++) {
+            Class<? extends StreamData> streamClass = streamClassList.get(i);
+            int streamId = i + 1;
+            classMap.put(streamClass, streamId);
+            idMap.put(streamId, streamClass);
+        }
     }
 
     @Override public int findIdByClass(Class<? extends StreamData> streamDataClass) {