You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2019/03/30 07:37:24 UTC
[incubator-skywalking] branch master updated: Divide two static
classes (#2411)
This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 1fd5c57 Divide two static classes (#2411)
1fd5c57 is described below
commit 1fd5c5790a25875f0e10c72fd767eaf38bbd2c99
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Sat Mar 30 15:37:19 2019 +0800
Divide two static classes (#2411)
* 1. Remove static worker instances holder.
2. Remove static worker instance id generator.
* Fixed test case failure.
---
.../skywalking/oap/server/core/CoreModule.java | 4 +++
.../oap/server/core/CoreModuleProvider.java | 5 ++++
.../oap/server/core/alarm/AlarmEntrance.java | 12 ++++-----
.../core/analysis/worker/AlarmNotifyWorker.java | 13 ++++------
.../server/core/analysis/worker/ExportWorker.java | 12 ++++-----
.../analysis/worker/IndicatorAggregateWorker.java | 8 +++---
.../analysis/worker/IndicatorPersistentWorker.java | 6 ++---
.../core/analysis/worker/IndicatorProcess.java | 29 +++++++---------------
.../analysis/worker/IndicatorRemoteWorker.java | 11 ++++----
.../core/analysis/worker/IndicatorTransWorker.java | 13 +++++-----
.../core/analysis/worker/PersistenceWorker.java | 8 +++---
.../analysis/worker/RecordPersistentWorker.java | 6 ++---
.../server/core/analysis/worker/RecordProcess.java | 6 +----
.../server/core/analysis/worker/TopNProcess.java | 5 +---
.../server/core/analysis/worker/TopNWorker.java | 11 ++++----
.../core/register/worker/InventoryProcess.java | 10 +++-----
.../register/worker/RegisterDistinctWorker.java | 5 ++--
.../register/worker/RegisterPersistentWorker.java | 8 +++---
.../core/register/worker/RegisterRemoteWorker.java | 8 +++---
.../server/core/remote/RemoteServiceHandler.java | 13 ++++++++--
.../core/remote/client/SelfRemoteClient.java | 8 +++---
.../oap/server/core/worker/AbstractWorker.java | 9 +++++--
...IdGenerator.java => IWorkerInstanceGetter.java} | 11 +++-----
...erInstances.java => IWorkerInstanceSetter.java} | 15 +++--------
...rInstances.java => WorkerInstancesService.java} | 19 +++++++++-----
.../core/remote/RemoteServiceHandlerTestCase.java | 8 +++---
.../remote/client/GRPCRemoteClientRealClient.java | 5 ++--
.../remote/client/GRPCRemoteClientTestCase.java | 14 +++++++----
.../SegmentStandardizationWorker.java | 12 ++++-----
29 files changed, 145 insertions(+), 149 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index feaa10f..c53a48c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
@@ -48,6 +49,9 @@ public class CoreModule extends ModuleDefine {
classes.add(DownsamplingConfigService.class);
classes.add(IComponentLibraryCatalogService.class);
+ classes.add(IWorkerInstanceGetter.class);
+ classes.add(IWorkerInstanceSetter.class);
+
addServerInterface(classes);
addReceiverInterface(classes);
addInsideService(classes);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index f8a3d95..e584cd5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -40,6 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
+import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
@@ -120,6 +121,10 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
+ WorkerInstancesService instancesService = new WorkerInstancesService();
+ this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
+ this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
+
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelGetter.class, storageAnnotationListener);
this.registerServiceImplementation(IModelOverride.class, storageAnnotationListener);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java
index 219753a..902fff6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java
@@ -19,21 +19,21 @@
package org.apache.skywalking.oap.server.core.alarm;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author wusheng
*/
public class AlarmEntrance {
- private ModuleManager moduleManager;
+ private ModuleDefineHolder moduleDefineHolder;
private IndicatorNotify indicatorNotify;
- public AlarmEntrance(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
+ public AlarmEntrance(ModuleDefineHolder moduleDefineHolder) {
+ this.moduleDefineHolder = moduleDefineHolder;
}
public void forward(Indicator indicator) {
- if (!moduleManager.has(AlarmModule.NAME)) {
+ if (!moduleDefineHolder.has(AlarmModule.NAME)) {
return;
}
@@ -44,7 +44,7 @@ public class AlarmEntrance {
private void init() {
if (indicatorNotify == null) {
- indicatorNotify = moduleManager.find(AlarmModule.NAME).provider().getService(IndicatorNotify.class);
+ indicatorNotify = moduleDefineHolder.find(AlarmModule.NAME).provider().getService(IndicatorNotify.class);
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java
index d325648..3812355 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java
@@ -18,11 +18,10 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import org.apache.skywalking.oap.server.core.alarm.*;
+import org.apache.skywalking.oap.server.core.alarm.AlarmEntrance;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.WithMetadata;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* Alarm notify worker, do a simple route to alarm core after the aggregation persistence.
@@ -30,13 +29,11 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
* @author wusheng
*/
public class AlarmNotifyWorker extends AbstractWorker<Indicator> {
- private ModuleManager moduleManager;
private AlarmEntrance entrance;
- public AlarmNotifyWorker(int workerId, ModuleManager moduleManager) {
- super(workerId);
- this.moduleManager = moduleManager;
- this.entrance = new AlarmEntrance(moduleManager);
+ public AlarmNotifyWorker(ModuleDefineHolder moduleDefineHolder) {
+ super(moduleDefineHolder);
+ this.entrance = new AlarmEntrance(moduleDefineHolder);
}
@Override public void in(Indicator indicator) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
index 21ab3ab..940323a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
@@ -21,25 +21,23 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author wusheng
*/
public class ExportWorker extends AbstractWorker<Indicator> {
- private ModuleManager moduleManager;
private MetricValuesExportService exportService;
- public ExportWorker(int workerId, ModuleManager moduleManager) {
- super(workerId);
- this.moduleManager = moduleManager;
+ public ExportWorker(ModuleDefineHolder moduleDefineHolder) {
+ super(moduleDefineHolder);
}
@Override public void in(Indicator indicator) {
- if (exportService != null || moduleManager.has(ExporterModule.NAME)) {
+ if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
if (indicator instanceof WithMetadata) {
if (exportService == null) {
- exportService = moduleManager.find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
+ exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
}
exportService.export(((WithMetadata)indicator).getMeta(), indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 4204658..66380fd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
@@ -45,9 +45,9 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
private final long l2AggregationSendCycle;
private long lastSendTimestamp;
- IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker,
+ IndicatorAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Indicator> nextWorker,
String modelName) {
- super(workerId);
+ super(moduleDefineHolder);
this.modelName = modelName;
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
@@ -62,7 +62,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
}
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
- MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
+ MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min"));
lastSendTimestamp = System.currentTimeMillis();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 13a5193..416c5b6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
import static java.util.Objects.nonNull;
@@ -45,10 +45,10 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
private final AbstractWorker<Indicator> nextExportWorker;
private final DataCarrier<Indicator> dataCarrier;
- IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
+ IndicatorPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize,
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextAlarmWorker,
AbstractWorker<Indicator> nextExportWorker) {
- super(moduleManager, workerId, batchSize);
+ super(moduleDefineHolder, batchSize);
this.modelName = modelName;
this.mergeDataCache = new MergeDataCache<>();
this.indicatorDAO = indicatorDAO;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
index 6049394..de9f6be 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
-import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
@@ -66,29 +65,20 @@ public enum IndicatorProcess {
IndicatorPersistentWorker dayPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Day.getName());
IndicatorPersistentWorker monthPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Month.getName());
- IndicatorTransWorker transWorker = new IndicatorTransWorker(moduleManager, modelName, WorkerIdGenerator.INSTANCES.generate(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
- WorkerInstances.INSTANCES.put(transWorker.getWorkerId(), transWorker);
-
- IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker, modelName);
- WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
-
- IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(moduleManager, WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName);
- WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(), aggregateWorker);
+ IndicatorTransWorker transWorker = new IndicatorTransWorker(moduleManager, modelName, minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
+ IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(moduleManager, transWorker, modelName);
+ IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(moduleManager, remoteWorker, modelName);
entryWorkers.put(indicatorClass, aggregateWorker);
}
private IndicatorPersistentWorker minutePersistentWorker(ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, String modelName) {
- AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
- WorkerInstances.INSTANCES.put(alarmNotifyWorker.getWorkerId(), alarmNotifyWorker);
-
- ExportWorker exportWorker = new ExportWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
- WorkerInstances.INSTANCES.put(exportWorker.getWorkerId(), exportWorker);
+ AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleManager);
+ ExportWorker exportWorker = new ExportWorker(moduleManager);
- IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
- 1000, moduleManager, indicatorDAO, alarmNotifyWorker, exportWorker);
- WorkerInstances.INSTANCES.put(minutePersistentWorker.getWorkerId(), minutePersistentWorker);
+ IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(moduleManager, modelName,
+ 1000, indicatorDAO, alarmNotifyWorker, exportWorker);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
@@ -96,9 +86,8 @@ public enum IndicatorProcess {
private IndicatorPersistentWorker worker(ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, String modelName) {
- IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
- 1000, moduleManager, indicatorDAO, null, null);
- WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(moduleManager, modelName,
+ 1000, indicatorDAO, null, null);
persistentWorkers.add(persistentWorker);
return persistentWorker;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
index 320c2de..7f63100 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
@@ -23,9 +23,8 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -38,10 +37,10 @@ public class IndicatorRemoteWorker extends AbstractWorker<Indicator> {
private final RemoteSenderService remoteSender;
private final String modelName;
- IndicatorRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker<Indicator> nextWorker,
+ IndicatorRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Indicator> nextWorker,
String modelName) {
- super(workerId);
- this.remoteSender = moduleManager.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
+ super(moduleDefineHolder);
+ this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
this.nextWorker = nextWorker;
this.modelName = modelName;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java
index 526254d..1c6bc42 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
@@ -43,20 +43,18 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> {
private CounterMetric aggregationDayCounter;
private CounterMetric aggregationMonthCounter;
- public IndicatorTransWorker(ModuleManager moduleManager,
- String modelName,
- int workerId,
+ public IndicatorTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
IndicatorPersistentWorker minutePersistenceWorker,
IndicatorPersistentWorker hourPersistenceWorker,
IndicatorPersistentWorker dayPersistenceWorker,
IndicatorPersistentWorker monthPersistenceWorker) {
- super(workerId);
+ super(moduleDefineHolder);
this.minutePersistenceWorker = minutePersistenceWorker;
this.hourPersistenceWorker = hourPersistenceWorker;
this.dayPersistenceWorker = dayPersistenceWorker;
this.monthPersistenceWorker = monthPersistenceWorker;
- MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
+ MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationMinCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "min"));
aggregationHourCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
@@ -80,7 +78,8 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> {
aggregationHourCounter.inc();
monthPersistenceWorker.in(indicator.toMonth());
}
- /**
+
+ /*
* Minute persistent must be at the end of all time dimensionalities
* Because #toHour, #toDay, #toMonth include clone inside, which could avoid concurrency situation.
*/
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index a35f2a8..7e6a4d3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.data.Window;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
@@ -35,10 +35,10 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
private final int batchSize;
private final IBatchDAO batchDAO;
- PersistenceWorker(ModuleManager moduleManager, int workerId, int batchSize) {
- super(workerId);
+ PersistenceWorker(ModuleDefineHolder moduleDefineHolder, int batchSize) {
+ super(moduleDefineHolder);
this.batchSize = batchSize;
- this.batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
+ this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
}
void onWork(INPUT input) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index a6b972e..7dfb7f4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
@@ -40,9 +40,9 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
private final IRecordDAO recordDAO;
private final DataCarrier<Record> dataCarrier;
- RecordPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
+ RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize,
IRecordDAO recordDAO) {
- super(moduleManager, workerId, batchSize);
+ super(moduleDefineHolder, batchSize);
this.modelName = modelName;
this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java
index 11ff284..1018f53 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java
@@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
-import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
@@ -60,12 +59,9 @@ public enum RecordProcess {
recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " record DAO failure.", e);
-
}
- RecordPersistentWorker persistentWorker = new RecordPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
- 1000, moduleManager, recordDAO);
- WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleManager, modelName, 1000, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
index 4ea3dc7..55334cf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java
@@ -27,7 +27,6 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
-import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
@@ -59,9 +58,7 @@ public enum TopNProcess {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " top n record DAO failure.", e);
}
- TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager,
- 50, recordDAO);
- WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ TopNWorker persistentWorker = new TopNWorker(moduleManager, modelName, 50, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 8bf5dd6..fd6ab02 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
@@ -33,7 +33,9 @@ import org.slf4j.*;
* @author wusheng
*/
public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
+
private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
+
private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
private final IRecordDAO recordDAO;
private final String modelName;
@@ -41,10 +43,9 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
private long reportCycle;
private volatile long lastReportTimestamp;
- public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
- int topNSize,
- IRecordDAO recordDAO) {
- super(moduleManager, workerId, -1);
+ public TopNWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
+ int topNSize, IRecordDAO recordDAO) {
+ super(moduleDefineHolder, -1);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO;
this.modelName = modelName;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java
index ce0e97a..c5cd568 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java
@@ -23,7 +23,6 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
-import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
@@ -52,14 +51,11 @@ public enum InventoryProcess {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " register DAO failure.", e);
}
- RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager, registerDAO, scopeId);
- WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleManager, modelName, registerDAO, scopeId);
- RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, persistentWorker);
- WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
+ RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleManager, persistentWorker);
- RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker);
- WorkerInstances.INSTANCES.put(distinctWorker.getWorkerId(), distinctWorker);
+ RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleManager, remoteWorker);
entryWorkers.put(inventoryClass, distinctWorker);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index 158f01f..b077b4f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
@@ -39,8 +40,8 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
private final Map<RegisterSource, RegisterSource> sources;
private int messageNum;
- RegisterDistinctWorker(int workerId, AbstractWorker<RegisterSource> nextWorker) {
- super(workerId);
+ RegisterDistinctWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<RegisterSource> nextWorker) {
+ super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.sources = new HashMap<>();
this.dataCarrier = new DataCarrier<>(1, 1000);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index e42c880..ed2b9ea 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
@@ -44,13 +44,13 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
private final IRegisterDAO registerDAO;
private final DataCarrier<RegisterSource> dataCarrier;
- RegisterPersistentWorker(int workerId, String modelName, ModuleManager moduleManager,
+ RegisterPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
IRegisterDAO registerDAO, int scopeId) {
- super(workerId);
+ super(moduleDefineHolder);
this.modelName = modelName;
this.sources = new HashMap<>();
this.registerDAO = registerDAO;
- this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
+ this.registerLockDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
this.scopeId = scopeId;
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
index 1bf3e41..4d4cc0f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
@@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
@@ -36,9 +36,9 @@ public class RegisterRemoteWorker extends AbstractWorker<RegisterSource> {
private final AbstractWorker<RegisterSource> nextWorker;
private final RemoteSenderService remoteSender;
- RegisterRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker<RegisterSource> nextWorker) {
- super(workerId);
- this.remoteSender = moduleManager.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
+ RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<RegisterSource> nextWorker) {
+ super(moduleDefineHolder);
+ this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
this.nextWorker = nextWorker;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 89d301c..dffa5f8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
-import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -44,6 +44,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
+ private IWorkerInstanceGetter workerInstanceGetter;
private CounterMetric remoteInCounter;
private CounterMetric remoteInErrorCounter;
private HistogramMetric remoteInHistogram;
@@ -71,6 +72,14 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
}
}
+ if (Objects.isNull(workerInstanceGetter)) {
+ synchronized (RemoteServiceHandler.class) {
+ if (Objects.isNull(workerInstanceGetter)) {
+ workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);
+ }
+ }
+ }
+
return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) {
remoteInCounter.inc();
@@ -84,7 +93,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
try {
StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData);
- WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
+ workerInstanceGetter.get(nextWorkerId).in(streamData);
} catch (Throwable t) {
remoteInErrorCounter.inc();
logger.error(t.getMessage(), t);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index ad253ca..abb7aae 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,9 +18,9 @@
package org.apache.skywalking.oap.server.core.remote.client;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
@@ -32,9 +32,11 @@ public class SelfRemoteClient implements RemoteClient {
private final Address address;
private CounterMetric remoteOutCounter;
+ private final IWorkerInstanceGetter workerInstanceGetter;
public SelfRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address) {
this.address = address;
+ workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "Y"));
@@ -52,7 +54,7 @@ public class SelfRemoteClient implements RemoteClient {
}
@Override public void push(int nextWorkerId, StreamData streamData) {
- WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
+ workerInstanceGetter.get(nextWorkerId).in(streamData);
}
@Override public int compareTo(RemoteClient o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index c079a11..c68b690 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.core.worker;
import lombok.Getter;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
@@ -26,9 +28,12 @@ import lombok.Getter;
public abstract class AbstractWorker<INPUT> {
@Getter private final int workerId;
+ @Getter private final ModuleDefineHolder moduleDefineHolder;
- public AbstractWorker(int workerId) {
- this.workerId = workerId;
+ public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
+ this.moduleDefineHolder = moduleDefineHolder;
+ IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
+ this.workerId = workerInstanceSetter.put(this);
}
public abstract void in(INPUT input);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
similarity index 84%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
index fc7f5af..9b54184 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
@@ -18,15 +18,12 @@
package org.apache.skywalking.oap.server.core.worker;
+import org.apache.skywalking.oap.server.library.module.Service;
+
/**
* @author peng-yongsheng
*/
-public enum WorkerIdGenerator {
- INSTANCES;
-
- private int workerId = 0;
+public interface IWorkerInstanceGetter extends Service {
- public synchronized int generate() {
- return workerId++;
- }
+ AbstractWorker get(int workerId);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
similarity index 72%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
index a80dbd5..eef279f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
@@ -18,21 +18,12 @@
package org.apache.skywalking.oap.server.core.worker;
-import java.util.*;
+import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
-public enum WorkerInstances {
- INSTANCES;
+public interface IWorkerInstanceSetter extends Service {
- private Map<Integer, AbstractWorker> instances = new HashMap<>();
-
- public void put(int workerId, AbstractWorker instance) {
- instances.put(workerId, instance);
- }
-
- public AbstractWorker get(int workerId) {
- return instances.get(workerId);
- }
+ int put(AbstractWorker instance);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
similarity index 65%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
index a80dbd5..51d671a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
@@ -19,20 +19,27 @@
package org.apache.skywalking.oap.server.core.worker;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
-public enum WorkerInstances {
- INSTANCES;
+public class WorkerInstancesService implements IWorkerInstanceSetter, IWorkerInstanceGetter {
- private Map<Integer, AbstractWorker> instances = new HashMap<>();
+ private final AtomicInteger generator = new AtomicInteger(1);
+ private final Map<Integer, AbstractWorker> instances;
- public void put(int workerId, AbstractWorker instance) {
- instances.put(workerId, instance);
+ public WorkerInstancesService() {
+ this.instances = new HashMap<>();
}
- public AbstractWorker get(int workerId) {
+ @Override public AbstractWorker get(int workerId) {
return instances.get(workerId);
}
+
+ @Override public int put(AbstractWorker instance) {
+ int workerId = generator.getAndIncrement();
+ instances.put(workerId, instance);
+ return workerId;
+ }
}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
index 5b79686..f284369 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
@@ -58,8 +58,6 @@ public class RemoteServiceHandlerTestCase {
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
- WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker());
-
String serverName = InProcessServerBuilder.generateName();
MetricCreator metricCreator = mock(MetricCreator.class);
when(metricCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetric() {
@@ -148,8 +146,8 @@ public class RemoteServiceHandlerTestCase {
static class TestWorker extends AbstractWorker {
- public TestWorker() {
- super(1);
+ public TestWorker(ModuleDefineHolder moduleDefineHolder) {
+ super(moduleDefineHolder);
}
@Override public void in(Object o) {
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
index b704d7b..fd2a804 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
@@ -96,8 +97,8 @@ public class GRPCRemoteClientRealClient {
static class TestWorker extends AbstractWorker {
- public TestWorker(int workerId) {
- super(workerId);
+ public TestWorker(ModuleDefineHolder moduleDefineHolder) {
+ super(moduleDefineHolder);
}
@Override public void in(Object o) {
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
index 550c45d..b28c644 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
@@ -52,8 +53,11 @@ public class GRPCRemoteClientTestCase {
classGetter = mock(StreamDataClassGetter.class);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
- TestWorker worker = new TestWorker(nextWorkerId);
- WorkerInstances.INSTANCES.put(nextWorkerId, worker);
+ WorkerInstancesService workerInstancesService = new WorkerInstancesService();
+ moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService);
+ moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
+
+ TestWorker worker = new TestWorker(moduleManager);
}
@Test
@@ -89,7 +93,7 @@ public class GRPCRemoteClientTestCase {
remoteClient.push(nextWorkerId, new TestStreamData());
}
- TimeUnit.SECONDS.sleep(1);
+ TimeUnit.SECONDS.sleep(2);
}
public static class TestStreamData extends StreamData {
@@ -113,8 +117,8 @@ public class GRPCRemoteClientTestCase {
class TestWorker extends AbstractWorker {
- public TestWorker(int workerId) {
- super(workerId);
+ public TestWorker(ModuleDefineHolder moduleDefineHolder) {
+ super(moduleDefineHolder);
}
@Override public void in(Object o) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index 46a9117..15672c5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -25,7 +25,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.buffer.BufferStream;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
@@ -41,10 +41,10 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private final DataCarrier<SegmentStandardization> dataCarrier;
private CounterMetric traceBufferFileIn;
- public SegmentStandardizationWorker(ModuleManager moduleManager, SegmentParse.Producer segmentParseCreator,
- String path,
- int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException {
- super(Integer.MAX_VALUE);
+ public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder,
+ SegmentParse.Producer segmentParseCreator, String path, int offsetFileMaxSize,
+ int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException {
+ super(moduleDefineHolder);
BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path);
builder.cleanWhenRestart(cleanWhenRestart);
@@ -59,7 +59,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
dataCarrier = new DataCarrier<>("SegmentStandardizationWorker", 1, 1024);
dataCarrier.consume(new Consumer(stream), 1, 200);
- MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
+ MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
String metricNamePrefix = isV6 ? "v6_" : "v5_";
traceBufferFileIn = metricCreator.createCounter(metricNamePrefix + "trace_buffer_file_in", "The number of trace segment into the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);