You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/06/14 03:31:55 UTC
[skywalking] branch master updated: Fix no stream register. (#2873)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 0103d8a Fix no stream register. (#2873)
0103d8a is described below
commit 0103d8ad2892f888cb55a01a7efd39a44a22726f
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Jun 14 11:31:51 2019 +0800
Fix no stream register. (#2873)
* Fix no stream register.
* Make sure list order.
* Refactor codes.
* Fix wrong revert.
---
.../oap/server/core/CoreModuleProvider.java | 1 +
.../analysis/worker/MetricsStreamProcessor.java | 4 ++++
.../register/worker/InventoryStreamProcessor.java | 4 ++++
.../core/remote/define/StreamDataMapping.java | 27 ++++++++++++++++++----
4 files changed, 31 insertions(+), 5 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9430020..01ddd9f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -160,6 +160,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.scan(() -> {
});
+ streamDataMapping.init();
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index e250506..20d09bf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*;
@@ -66,6 +67,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
+ StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
+ streamDataMappingSetter.putIfAbsent(metricsClass);
+
MetricsPersistentWorker hourPersistentWorker = null;
MetricsPersistentWorker dayPersistentWorker = null;
MetricsPersistentWorker monthPersistentWorker = null;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 0e4b3dd..596ab81 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -22,6 +22,7 @@ import java.util.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*;
@@ -57,6 +58,9 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
+ StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
+ streamDataMappingSetter.putIfAbsent(inventoryClass);
+
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
index 31eb34a..00ef25b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
@@ -25,12 +25,12 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
* @author peng-yongsheng
*/
public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMappingSetter {
-
- private int id = 0;
+ private List<Class<? extends StreamData>> streamClassList;
private final Map<Class<? extends StreamData>, Integer> classMap;
private final Map<Integer, Class<? extends StreamData>> idMap;
public StreamDataMapping() {
+ streamClassList = new ArrayList<>();
this.classMap = new HashMap<>();
this.idMap = new HashMap<>();
}
@@ -40,9 +40,26 @@ public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMap
return;
}
- id++;
- classMap.put(streamDataClass, id);
- idMap.put(id, streamDataClass);
+ streamClassList.add(streamDataClass);
+ }
+
+ public void init() {
+ /**
+ * The stream protocol use this list order to assign the ID,
+ * which is used in across node communication. This order must be certain.
+ */
+ Collections.sort(streamClassList, new Comparator<Class>() {
+ @Override public int compare(Class streamClass1, Class streamClass2) {
+ return streamClass1.getName().compareTo(streamClass2.getName());
+ }
+ });
+
+ for (int i = 0; i < streamClassList.size(); i++) {
+ Class<? extends StreamData> streamClass = streamClassList.get(i);
+ int streamId = i + 1;
+ classMap.put(streamClass, streamId);
+ idMap.put(streamId, streamClass);
+ }
}
@Override public int findIdByClass(Class<? extends StreamData> streamDataClass) {