You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/07/30 01:54:39 UTC
[incubator-skywalking] branch 6.0 updated: Feature/oap/remote
(#1505)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/6.0 by this push:
new efe5299 Feature/oap/remote (#1505)
efe5299 is described below
commit efe5299d03536e739c3d8022bf0deb56cbc46088
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Mon Jul 30 09:54:36 2018 +0800
Feature/oap/remote (#1505)
* Sample operator code.
* Indicator aggregator framework.
* Provide some annotation for OAL.
* Remote module.
* Register service.
* Add apache license header.
* Ignore comments when load definition files.
---
.../plugin/standalone/StandaloneManagerTest.java | 7 +-
.../ClusterModuleZookeeperProviderTestCase.java | 17 +--
oap-server/server-core/pom.xml | 50 +++++++
.../skywalking/oap/server/core/CoreModule.java | 12 ++
.../oap/server/core/CoreModuleProvider.java | 51 ++++---
.../server/core/analysis/DispatcherManager.java | 5 +-
.../oap/server/core/analysis/data/StreamData.java | 4 +-
.../core/analysis/endpoint/EndpointDispatcher.java | 19 ++-
...java => EndpointLatencyAvgAggregateWorker.java} | 16 ++-
.../endpoint/EndpointLatencyAvgIndicator.java | 27 +++-
...ava => EndpointLatencyAvgPersistentWorker.java} | 12 +-
...or.java => EndpointLatencyAvgRemoteWorker.java} | 17 ++-
.../core/analysis/indicator/AvgIndicator.java | 16 +--
.../server/core/analysis/indicator/Indicator.java | 8 +-
.../indicator/annotation/IndicatorType.java | 2 +
.../IndicatorDefineLoadException.java} | 12 +-
.../analysis/indicator/define/IndicatorMapper.java | 82 +++++++++++
.../AbstractAggregatorWorker.java} | 19 +--
.../AbstractPersistentWorker.java} | 16 +--
.../core/analysis/worker/AbstractRemoteWorker.java | 63 ++++++++
.../IndicatorType.java => worker/Worker.java} | 10 +-
.../define/WorkerDefineLoadException.java} | 12 +-
.../core/analysis/worker/define/WorkerMapper.java | 102 +++++++++++++
.../oap/server/core/cluster/RemoteInstance.java | 41 ++----
.../server/core/receiver/SourceReceiverImpl.java | 5 +-
.../remote/{Selector.java => Deserializable.java} | 6 +-
.../server/core/remote/RemoteSenderService.java | 60 ++++++++
.../server/core/remote/RemoteServiceHandler.java | 71 +++++++++
.../RemoteData.java => remote/Serializable.java} | 8 +-
.../core/remote/client/GRPCRemoteClient.java | 158 +++++++++++++++++++++
.../client/RemoteClient.java} | 14 +-
.../core/remote/client/RemoteClientManager.java | 126 ++++++++++++++++
.../core/remote/client/SelfRemoteClient.java} | 34 +++--
.../selector/ForeverFirstSelector.java} | 17 ++-
.../selector/HashCodeSelector.java} | 17 +--
.../selector/RemoteClientSelector.java} | 11 +-
.../selector/RollingSelector.java} | 32 ++---
.../core/remote/{ => selector}/Selector.java | 2 +-
.../Indicator.java => proto/RemoteService.proto} | 33 +++--
.../main/resources/META-INF/defines/indicator.def | 19 +++
.../src/main/resources/META-INF/defines/worker.def | 21 +++
.../indicator/define/IndicatorMapperTestCase.java} | 18 +--
.../indicator/define/TestAvgIndicator.java} | 18 +--
.../test/resources/META-INF/defines/indicator.def | 19 +++
.../src/test/resources/META-INF/defines/worker.def | 17 +++
.../server-core/src/test/resources/log4j2.xml | 31 ++++
.../oap/server/library/client/grpc/GRPCClient.java | 5 +-
.../oap/server/library/module/ModuleProvider.java | 3 +-
.../receiver/mesh/provider/MeshGRPCHandler.java | 2 -
49 files changed, 1117 insertions(+), 250 deletions(-)
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
index 2e69130..55671bc 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
@@ -19,15 +19,14 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.*;
public class StandaloneManagerTest {
@Test
public void test() {
StandaloneManager standaloneManager = new StandaloneManager();
- RemoteInstance remote1 = new RemoteInstance();
- RemoteInstance remote2 = new RemoteInstance();
+ RemoteInstance remote1 = new RemoteInstance("A", 100, true);
+ RemoteInstance remote2 = new RemoteInstance("B", 100, false);
standaloneManager.registerRemote(remote1);
Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
index bbffa9f..a318481 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
@@ -21,16 +21,9 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.io.IOException;
import java.util.List;
import org.apache.curator.test.TestingServer;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.junit.*;
/**
* @author peng-yongsheng
@@ -59,9 +52,7 @@ public class ClusterModuleZookeeperProviderTestCase {
ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
- RemoteInstance remoteInstance = new RemoteInstance();
- remoteInstance.setHost("ProviderAHost");
- remoteInstance.setPort(1000);
+ RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
moduleRegister.registerRemote(remoteInstance);
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index 767e1c8..ca6ac7a 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -43,6 +43,11 @@
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
+ <artifactId>library-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
<artifactId>library-server</artifactId>
<version>${project.version}</version>
</dependency>
@@ -57,4 +62,49 @@
<version>${project.version}</version>
</dependency>
</dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.4.1.Final</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.4.3</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <configuration>
+ <!--
+ The version of protoc must match protobuf-java. If you don't depend on
+ protobuf-java directly, you will be transitively depending on the
+ protobuf-java version that grpc depends on.
+ -->
+ <protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 6f3de48..b03152f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -19,7 +19,11 @@
package org.apache.skywalking.oap.server.core;
import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -38,6 +42,7 @@ public class CoreModule extends ModuleDefine {
List<Class> classes = new ArrayList<>();
addServerInterface(classes);
addReceiverInterface(classes);
+ addInsideService(classes);
return classes.toArray(new Class[] {});
}
@@ -47,6 +52,13 @@ public class CoreModule extends ModuleDefine {
classes.add(JettyHandlerRegister.class);
}
+ private void addInsideService(List<Class> classes) {
+ classes.add(IndicatorMapper.class);
+ classes.add(WorkerMapper.class);
+ classes.add(RemoteClientManager.class);
+ classes.add(RemoteSenderService.class);
+ }
+
private void addReceiverInterface(List<Class> classes) {
classes.add(SourceReceiver.class);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 1af3873..757c34a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -18,24 +18,18 @@
package org.apache.skywalking.oap.server.core;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiverImpl;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.receiver.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -47,17 +41,21 @@ public class CoreModuleProvider extends ModuleProvider {
private final CoreModuleConfig moduleConfig;
private GRPCServer grpcServer;
private JettyServer jettyServer;
+ private final IndicatorMapper indicatorMapper;
+ private final WorkerMapper workerMapper;
public CoreModuleProvider() {
super();
this.moduleConfig = new CoreModuleConfig();
+ this.indicatorMapper = new IndicatorMapper();
+ this.workerMapper = new WorkerMapper(getManager());
}
@Override public String name() {
return "default";
}
- @Override public Class module() {
+ @Override public Class<? extends ModuleDefine> module() {
return CoreModule.class;
}
@@ -75,11 +73,24 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
- this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
+ this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl(getManager()));
+
+ this.registerServiceImplementation(IndicatorMapper.class, indicatorMapper);
+ this.registerServiceImplementation(WorkerMapper.class, workerMapper);
+
+ this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager()));
+ this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
}
- @Override public void start() {
+ @Override public void start() throws ModuleStartException {
+ grpcServer.addHandler(new RemoteServiceHandler(getManager()));
+ try {
+ indicatorMapper.load();
+ workerMapper.load();
+ } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
@@ -90,9 +101,7 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
- RemoteInstance gRPCServerInstance = new RemoteInstance();
- gRPCServerInstance.setHost(moduleConfig.getGRPCHost());
- gRPCServerInstance.setPort(moduleConfig.getGRPCPort());
+ RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index de8da4e..9317382 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher;
import org.apache.skywalking.oap.server.core.receiver.Scope;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
@@ -32,9 +33,9 @@ public class DispatcherManager {
private Map<Scope, SourceDispatcher> dispatcherMap;
- public DispatcherManager() {
+ public DispatcherManager(ModuleManager moduleManager) {
this.dispatcherMap = new HashMap<>();
- this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
+ this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher(moduleManager));
}
public SourceDispatcher getDispatcher(Scope scope) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
index 63f149f..57c75d4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
@@ -18,10 +18,12 @@
package org.apache.skywalking.oap.server.core.analysis.data;
+import org.apache.skywalking.oap.server.core.remote.*;
+
/**
* @author peng-yongsheng
*/
-public abstract class StreamData implements QueueData {
+public abstract class StreamData implements QueueData, Serializable, Deserializable {
private EndOfBatchContext endOfBatchContext;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
index 152ac5e..bcb3a2c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
@@ -18,18 +18,22 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.receiver.Endpoint;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
- private final EndpointLatencyAvgAggregator avgAggregator;
+ private final ModuleManager moduleManager;
+ private EndpointLatencyAvgAggregateWorker avgAggregator;
- public EndpointDispatcher() {
- this.avgAggregator = new EndpointLatencyAvgAggregator();
+ public EndpointDispatcher(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
}
@Override public void dispatch(Endpoint source) {
@@ -37,7 +41,14 @@ public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
}
private void avg(Endpoint source) {
- EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId());
+ if (avgAggregator == null) {
+ WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ avgAggregator = (EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class);
+ }
+
+ EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator();
+ indicator.setId(source.getId());
+ indicator.setTimeBucket(source.getTimeBucket());
indicator.combine(source.getLatency(), 1);
avgAggregator.in(indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
similarity index 64%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
index f409909..62d5134 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
@@ -18,17 +18,25 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
+public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
- private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
+ private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
- @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+ private final EndpointLatencyAvgRemoteWorker remoter;
+
+ public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
+ super(moduleManager);
+ this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
+ }
+ @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+ remoter.in(data);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index a200aeb..e339afa 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,19 +18,16 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
+import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgIndicator extends AvgIndicator {
- private final int id;
-
- public EndpointLatencyAvgIndicator(long timeBucket, int id) {
- super(timeBucket);
- this.id = id;
- }
+ @Setter @Getter private int id;
@Override public int hashCode() {
int result = 17;
@@ -55,4 +52,22 @@ public class EndpointLatencyAvgIndicator extends AvgIndicator {
return true;
}
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+ remoteBuilder.setDataIntegers(0, getId());
+ remoteBuilder.setDataIntegers(1, getCount());
+
+ remoteBuilder.setDataLongs(0, getTimeBucket());
+ remoteBuilder.setDataLongs(1, getSummation());
+ return remoteBuilder;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ setId(remoteData.getDataIntegers(0));
+ setCount(remoteData.getDataIntegers(1));
+
+ setTimeBucket(remoteData.getDataLongs(0));
+ setSummation(remoteData.getDataLongs(1));
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
similarity index 70%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
index f409909..e3c5c23 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
@@ -18,17 +18,15 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
-
- private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
- @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker<EndpointLatencyAvgIndicator> {
+ public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
+ super(moduleManager);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
similarity index 59%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
index f409909..47f7521 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
@@ -18,17 +18,24 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
+public class EndpointLatencyAvgRemoteWorker extends AbstractRemoteWorker<EndpointLatencyAvgIndicator> {
- private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
+ public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) {
+ super(moduleManager);
+ }
- @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+ @Override public Selector selector() {
+ return Selector.HashCode;
+ }
+ @Override public Class nextWorkerClass() {
+ return EndpointLatencyAvgPersistentWorker.class;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index 54ca9d6..da065f3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -18,28 +18,26 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
+import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
/**
* @author peng-yongsheng
*/
-@IndicatorType
+@IndicatorType(selector = Selector.HashCode)
public abstract class AvgIndicator extends Indicator {
- private long summation;
- private int count;
-
- public AvgIndicator(long timeBucket) {
- super(timeBucket);
- }
+ @Getter @Setter private long summation;
+ @Getter @Setter private int count;
@Entrance
- public void combine(@SourceFrom long summation, @ConstOne int count) {
+ public final void combine(@SourceFrom long summation, @ConstOne int count) {
this.summation += summation;
this.count += count;
}
- @Override public void combine(Indicator indicator) {
+ @Override public final void combine(Indicator indicator) {
AvgIndicator avgIndicator = (AvgIndicator)indicator;
combine(avgIndicator.summation, avgIndicator.count);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 66bb57d..533379e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
-import lombok.Getter;
+import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
/**
@@ -26,11 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
*/
public abstract class Indicator extends StreamData {
- @Getter private final long timeBucket;
-
- public Indicator(long timeBucket) {
- this.timeBucket = timeBucket;
- }
+ @Getter @Setter private long timeBucket;
public abstract void combine(Indicator indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index 7e8b8e1..46f0345 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
/**
* @author peng-yongsheng
@@ -26,4 +27,5 @@ import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.SOURCE)
public @interface IndicatorType {
+ Selector selector();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
similarity index 83%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
index 7e8b8e1..ca0521a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
@@ -16,14 +16,14 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
-
-import java.lang.annotation.*;
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public class IndicatorDefineLoadException extends Exception {
+
+ public IndicatorDefineLoadException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
new file mode 100644
index 0000000..8513fa2
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import java.io.*;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorMapper implements Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(IndicatorMapper.class);
+
+ private int id = 0;
+ private final Map<Class<Indicator>, Integer> classKeyMapping;
+ private final Map<Integer, Class<Indicator>> idKeyMapping;
+
+ public IndicatorMapper() {
+ this.classKeyMapping = new HashMap<>();
+ this.idKeyMapping = new HashMap<>();
+ }
+
+ @SuppressWarnings(value = "unchecked")
+ public void load() throws IndicatorDefineLoadException {
+ try {
+ List<String> indicatorClasses = new LinkedList<>();
+
+ Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/indicator.def");
+ while (urlEnumeration.hasMoreElements()) {
+ URL definitionFileURL = urlEnumeration.nextElement();
+ logger.info("Load indicator definition file url: {}", definitionFileURL.getPath());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
+ Properties properties = new Properties();
+ properties.load(bufferedReader);
+
+ Enumeration defineItem = properties.propertyNames();
+ while (defineItem.hasMoreElements()) {
+ String fullNameClass = (String)defineItem.nextElement();
+ indicatorClasses.add(fullNameClass);
+ }
+ }
+
+ for (String indicatorClassName : indicatorClasses) {
+ Class<Indicator> indicatorClass = (Class<Indicator>)Class.forName(indicatorClassName);
+ id++;
+ classKeyMapping.put(indicatorClass, id);
+ idKeyMapping.put(id, indicatorClass);
+ }
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IndicatorDefineLoadException(e.getMessage(), e);
+ }
+ }
+
+ public int findIdByClass(Class indicatorClass) {
+ return classKeyMapping.get(indicatorClass);
+ }
+
+ public Class<Indicator> findClassById(int id) {
+ return idKeyMapping.get(id);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
similarity index 84%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
index f4e769a..65d68b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
@@ -16,35 +16,36 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis;
+package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractAggregator<INPUT extends Indicator> {
+public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends Worker<INPUT> {
- private static final Logger logger = LoggerFactory.getLogger(AbstractAggregator.class);
+ private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class);
private final DataCarrier<INPUT> dataCarrier;
private final MergeDataCache<INPUT> mergeDataCache;
private int messageNum;
- public AbstractAggregator() {
+ public AbstractAggregatorWorker(ModuleManager moduleManager) {
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
}
- public void in(INPUT message) {
- message.setEndOfBatchContext(new EndOfBatchContext(false));
- dataCarrier.produce(message);
+ @Override public final void in(INPUT input) {
+ input.setEndOfBatchContext(new EndOfBatchContext(false));
+ dataCarrier.produce(input);
}
private void onWork(INPUT message) {
@@ -91,9 +92,9 @@ public abstract class AbstractAggregator<INPUT extends Indicator> {
private class AggregatorConsumer implements IConsumer<INPUT> {
- private final AbstractAggregator<INPUT> aggregator;
+ private final AbstractAggregatorWorker<INPUT> aggregator;
- private AggregatorConsumer(AbstractAggregator<INPUT> aggregator) {
+ private AggregatorConsumer(AbstractAggregatorWorker<INPUT> aggregator) {
this.aggregator = aggregator;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
index 63f149f..bcead67 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
@@ -16,20 +16,20 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
-public abstract class StreamData implements QueueData {
-
- private EndOfBatchContext endOfBatchContext;
+public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> {
- @Override public final EndOfBatchContext getEndOfBatchContext() {
- return this.endOfBatchContext;
+ public AbstractPersistentWorker(ModuleManager moduleManager) {
}
- @Override public final void setEndOfBatchContext(EndOfBatchContext context) {
- this.endOfBatchContext = context;
+ @Override public final void in(INPUT input) {
+
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
new file mode 100644
index 0000000..e3fb89d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends Worker<INPUT> {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractRemoteWorker.class);
+
+ private final ModuleManager moduleManager;
+ private RemoteSenderService remoteSender;
+ private WorkerMapper workerMapper;
+
+ public AbstractRemoteWorker(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ }
+
+ @Override public final void in(INPUT input) {
+ if (remoteSender == null) {
+ remoteSender = moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+ }
+ if (workerMapper == null) {
+ workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ }
+
+ try {
+ int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
+ remoteSender.send(nextWorkerId, input, selector());
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ public abstract Class nextWorkerClass();
+
+ public abstract Selector selector();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
similarity index 78%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
index 7e8b8e1..53010ee 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
@@ -16,14 +16,14 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public abstract class Worker<INPUT extends Indicator> {
+
+ public abstract void in(INPUT input);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
similarity index 78%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
index 7e8b8e1..a24a0a7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
@@ -16,14 +16,14 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
-
-import java.lang.annotation.*;
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public class WorkerDefineLoadException extends Exception {
+
+ public WorkerDefineLoadException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
new file mode 100644
index 0000000..9732430
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
+
+import java.io.*;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.Worker;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class WorkerMapper implements Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class);
+
+ private int id = 0;
+ private final ModuleManager moduleManager;
+ private final Map<Class<Worker>, Integer> classKeyMapping;
+ private final Map<Integer, Class<Worker>> idKeyMapping;
+ private final Map<Class<Worker>, Worker> classKeyInstanceMapping;
+ private final Map<Integer, Worker> idKeyInstanceMapping;
+
+ public WorkerMapper(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ this.classKeyMapping = new HashMap<>();
+ this.idKeyMapping = new HashMap<>();
+ this.classKeyInstanceMapping = new HashMap<>();
+ this.idKeyInstanceMapping = new HashMap<>();
+ }
+
+ @SuppressWarnings(value = "unchecked")
+ public void load() throws WorkerDefineLoadException {
+ try {
+ List<String> workerClasses = new LinkedList<>();
+
+ Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/worker.def");
+ while (urlEnumeration.hasMoreElements()) {
+ URL definitionFileURL = urlEnumeration.nextElement();
+ logger.info("Load worker definition file url: {}", definitionFileURL.getPath());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
+ Properties properties = new Properties();
+ properties.load(bufferedReader);
+
+ Enumeration defineItem = properties.propertyNames();
+ while (defineItem.hasMoreElements()) {
+ String fullNameClass = (String)defineItem.nextElement();
+ workerClasses.add(fullNameClass);
+ }
+ }
+
+ for (String workerClassName : workerClasses) {
+ Class<Worker> workerClass = (Class<Worker>)Class.forName(workerClassName);
+ id++;
+ classKeyMapping.put(workerClass, id);
+ idKeyMapping.put(id, workerClass);
+
+ Constructor<Worker> constructor = workerClass.getDeclaredConstructor(ModuleManager.class);
+ Worker worker = constructor.newInstance(moduleManager);
+ classKeyInstanceMapping.put(workerClass, worker);
+ idKeyInstanceMapping.put(id, worker);
+ }
+ } catch (Exception e) {
+ throw new WorkerDefineLoadException(e.getMessage(), e);
+ }
+ }
+
+ public int findIdByClass(Class workerClass) {
+ return classKeyMapping.get(workerClass);
+ }
+
+ public Class<Worker> findClassById(int id) {
+ return idKeyMapping.get(id);
+ }
+
+ public Worker findInstanceByClass(Class workerClass) {
+ return classKeyInstanceMapping.get(workerClass);
+ }
+
+ public Worker findInstanceById(int id) {
+ return idKeyInstanceMapping.get(id);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 53c4ea0..1e85b32 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -19,48 +19,29 @@
package org.apache.skywalking.oap.server.core.cluster;
import java.util.Objects;
+import lombok.*;
/**
* @author peng-yongsheng
*/
-public class RemoteInstance {
+public class RemoteInstance implements Comparable<RemoteInstance> {
- private String host;
- private int port;
- private boolean self = false;
+ @Getter private final String host;
+ @Getter private final int port;
+ @Getter @Setter private boolean isSelf = false;
- public RemoteInstance() {
-
- }
-
- public RemoteInstance(String host, int port, boolean self) {
+ public RemoteInstance(String host, int port, boolean isSelf) {
this.host = host;
this.port = port;
- this.self = self;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
+ this.isSelf = isSelf;
}
- public boolean isSelf() {
- return self;
+ @Override public int compareTo(RemoteInstance o) {
+ return toString().compareTo(toString());
}
- public void setSelf(boolean self) {
- this.self = self;
+ @Override public String toString() {
+ return host + String.valueOf(port);
}
@Override public boolean equals(Object o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
index 4d04a31..a6cf211 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.receiver;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
@@ -27,8 +28,8 @@ public class SourceReceiverImpl implements SourceReceiver {
private final DispatcherManager dispatcherManager;
- public SourceReceiverImpl() {
- this.dispatcherManager = new DispatcherManager();
+ public SourceReceiverImpl(ModuleManager moduleManager) {
+ this.dispatcherManager = new DispatcherManager(moduleManager);
}
@Override public void receive(Source source) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
similarity index 85%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
index b7ac6ed..b2b3a7e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
@@ -18,9 +18,11 @@
package org.apache.skywalking.oap.server.core.remote;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+
/**
* @author peng-yongsheng
*/
-public enum Selector {
- HashCode, Rolling, ForeverFirst
+public interface Deserializable {
+ void deserialize(RemoteData remoteData);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
new file mode 100644
index 0000000..b2ce1c4
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.remote.selector.*;
+import org.apache.skywalking.oap.server.library.module.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteSenderService implements Service {
+
+ private final ModuleManager moduleManager;
+ private final HashCodeSelector hashCodeSelector;
+ private final ForeverFirstSelector foreverFirstSelector;
+ private final RollingSelector rollingSelector;
+
+ public RemoteSenderService(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ this.hashCodeSelector = new HashCodeSelector();
+ this.foreverFirstSelector = new ForeverFirstSelector();
+ this.rollingSelector = new RollingSelector();
+ }
+
+ public void send(int nextWorkId, Indicator indicator, Selector selector) {
+ RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).getService(RemoteClientManager.class);
+
+ RemoteClient remoteClient;
+ switch (selector) {
+ case HashCode:
+ remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), indicator);
+ remoteClient.push(nextWorkId, indicator);
+ case Rolling:
+ remoteClient = rollingSelector.select(clientManager.getRemoteClient(), indicator);
+ remoteClient.push(nextWorkId, indicator);
+ case ForeverFirst:
+ remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), indicator);
+ remoteClient.push(nextWorkId, indicator);
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
new file mode 100644
index 0000000..892e951
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
+
+ private final IndicatorMapper indicatorMapper;
+ private final WorkerMapper workerMapper;
+
+ public RemoteServiceHandler(ModuleManager moduleManager) {
+ this.indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
+ this.workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ }
+
+ @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
+ return new StreamObserver<RemoteMessage>() {
+ @Override public void onNext(RemoteMessage message) {
+ int indicatorId = message.getIndicatorId();
+ int nextWorkerId = message.getNextWorkerId();
+ RemoteData remoteData = message.getRemoteData();
+
+ Class<Indicator> indicatorClass = indicatorMapper.findClassById(indicatorId);
+ try {
+ indicatorClass.newInstance().deserialize(remoteData);
+ } catch (InstantiationException | IllegalAccessException e) {
+ logger.warn(e.getMessage());
+ }
+ }
+
+ @Override public void onError(Throwable throwable) {
+ logger.error(throwable.getMessage(), throwable);
+ }
+
+ @Override public void onCompleted() {
+ responseObserver.onNext(Empty.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
similarity index 80%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
index fe4f1f5..1a7dbed 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
@@ -16,11 +16,13 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
-public interface RemoteData {
- String selectKey();
+public interface Serializable {
+ RemoteData.Builder serialize();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
new file mode 100644
index 0000000..75a5952
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
+
+ private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
+
+ private final GRPCClient client;
+ private final DataCarrier<RemoteMessage> carrier;
+ private final IndicatorMapper indicatorMapper;
+
+ public GRPCRemoteClient(IndicatorMapper indicatorMapper, RemoteInstance remoteInstance, int channelSize,
+ int bufferSize) {
+ this.indicatorMapper = indicatorMapper;
+ this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
+ this.carrier = new DataCarrier<>(channelSize, bufferSize);
+ this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
+ this.carrier.consume(new RemoteMessageConsumer(), 1);
+ }
+
+ @Override public void push(int nextWorkerId, Indicator indicator) {
+ int indicatorId = indicatorMapper.findIdByClass(indicator.getClass());
+ RemoteMessage.Builder builder = RemoteMessage.newBuilder();
+ builder.setNextWorkerId(nextWorkerId);
+ builder.setIndicatorId(indicatorId);
+ builder.setRemoteData(indicator.serialize());
+
+ this.carrier.produce(builder.build());
+ }
+
+ class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
+ @Override public void init() {
+ }
+
+ @Override public void consume(List<RemoteMessage> remoteMessages) {
+ StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+ for (RemoteMessage remoteMessage : remoteMessages) {
+ streamObserver.onNext(remoteMessage);
+ }
+ streamObserver.onCompleted();
+ }
+
+ @Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+
+ @Override public void onExit() {
+ }
+ }
+
+ private StreamObserver<RemoteMessage> createStreamObserver() {
+ RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
+
+ StreamStatus status = new StreamStatus(false);
+ return stub.call(new StreamObserver<Empty>() {
+ @Override public void onNext(Empty empty) {
+ }
+
+ @Override public void onError(Throwable throwable) {
+ logger.error(throwable.getMessage(), throwable);
+ }
+
+ @Override public void onCompleted() {
+ status.finished();
+ }
+ });
+ }
+
+ class StreamStatus {
+
+ private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
+
+ private volatile boolean status;
+
+ StreamStatus(boolean status) {
+ this.status = status;
+ }
+
+ public boolean isFinish() {
+ return status;
+ }
+
+ void finished() {
+ this.status = true;
+ }
+
+ /**
+ * @param maxTimeout max wait time, milliseconds.
+ */
+ public void wait4Finish(long maxTimeout) {
+ long time = 0;
+ while (!status) {
+ if (time > maxTimeout) {
+ break;
+ }
+ try2Sleep(5);
+ time += 5;
+ }
+ }
+
+ /**
+ * Try to sleep, and ignore the {@link InterruptedException}
+ *
+ * @param millis the length of time to sleep in milliseconds
+ */
+ private void try2Sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override public int compareTo(GRPCRemoteClient o) {
+ return this.client.toString().compareTo(o.client.toString());
+ }
+
+ public String getHost() {
+ return client.getHost();
+ }
+
+ public int getPort() {
+ return client.getPort();
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
similarity index 76%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
index 7e8b8e1..98d23f1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -16,14 +16,18 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+package org.apache.skywalking.oap.server.core.remote.client;
-import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public interface RemoteClient {
+
+ String getHost();
+
+ int getPort();
+
+ void push(int nextWorkerId, Indicator indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
new file mode 100644
index 0000000..4d2049b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.*;
+import java.util.concurrent.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteClientManager implements Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
+
+ private final ModuleManager moduleManager;
+ private IndicatorMapper indicatorMapper;
+ private ClusterNodesQuery clusterNodesQuery;
+ private final List<RemoteClient> clientsA;
+ private final List<RemoteClient> clientsB;
+ private List<RemoteClient> usingClients;
+
+ public RemoteClientManager(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ this.clientsA = new LinkedList<>();
+ this.clientsB = new LinkedList<>();
+ this.usingClients = clientsA;
+ }
+
+ public void start() {
+ this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
+ this.indicatorMapper = moduleManager.find(ClusterModule.NAME).getService(IndicatorMapper.class);
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 2, TimeUnit.SECONDS);
+ }
+
+ private void refresh() {
+ List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
+ Collections.sort(instanceList);
+
+ if (!compare(instanceList)) {
+ buildNewClients(instanceList);
+ }
+ }
+
+ public List<RemoteClient> getRemoteClient() {
+ return usingClients;
+ }
+
+ private List<RemoteClient> getFreeClients() {
+ if (usingClients.equals(clientsA)) {
+ return clientsB;
+ } else {
+ return clientsA;
+ }
+ }
+
+ private void switchCurrentClients() {
+ if (usingClients.equals(clientsA)) {
+ usingClients = clientsB;
+ } else {
+ usingClients = clientsA;
+ }
+ }
+
+ private void buildNewClients(List<RemoteInstance> remoteInstances) {
+ getFreeClients().clear();
+
+ Map<String, RemoteClient> currentClientsMap = new HashMap<>();
+ this.usingClients.forEach(remoteClient -> {
+ currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient);
+ });
+
+ remoteInstances.forEach(remoteInstance -> {
+ String address = address(remoteInstance.getHost(), remoteInstance.getPort());
+ RemoteClient client;
+ if (currentClientsMap.containsKey(address)) {
+ client = currentClientsMap.get(address);
+ } else {
+ if (remoteInstance.isSelf()) {
+ client = new SelfRemoteClient(moduleManager, remoteInstance.getHost(), remoteInstance.getPort());
+ } else {
+ client = new GRPCRemoteClient(indicatorMapper, remoteInstance, 1, 3000);
+ }
+ }
+ getFreeClients().add(client);
+ });
+
+ switchCurrentClients();
+ }
+
+ private boolean compare(List<RemoteInstance> remoteInstances) {
+ if (usingClients.size() == remoteInstances.size()) {
+ for (int i = 0; i < usingClients.size(); i++) {
+ if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private String address(String host, int port) {
+ return host + String.valueOf(port);
+ }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
similarity index 51%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 2416be6..109ff77 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -16,40 +16,38 @@
*
*/
-package org.apache.skywalking.oap.server.library.client.grpc;
+package org.apache.skywalking.oap.server.core.remote.client;
-import io.grpc.*;
-import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
-public class GRPCClient implements Client {
+public class SelfRemoteClient implements RemoteClient {
+ private final ModuleManager moduleManager;
private final String host;
-
private final int port;
- private ManagedChannel channel;
-
- public GRPCClient(String host, int port) {
+ public SelfRemoteClient(ModuleManager moduleManager, String host, int port) {
+ this.moduleManager = moduleManager;
this.host = host;
this.port = port;
}
- @Override public void initialize() {
- channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
- }
-
- @Override public void shutdown() {
- channel.shutdownNow();
+ @Override public String getHost() {
+ return host;
}
- public ManagedChannel getChannel() {
- return channel;
+ @Override public int getPort() {
+ return port;
}
- @Override public String toString() {
- return host + ":" + port;
+ @Override public void push(int nextWorkerId, Indicator indicator) {
+ WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ workerMapper.findInstanceById(nextWorkerId).in(indicator);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
similarity index 62%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
index f409909..e28f203 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
@@ -16,19 +16,24 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
+package org.apache.skywalking.oap.server.core.remote.selector;
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
+public class ForeverFirstSelector implements RemoteClientSelector {
- private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
- @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+ private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
+ @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("clients size: {}", clients.size());
+ }
+ return clients.get(0);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
similarity index 61%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
index f409909..3d256b5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
@@ -16,19 +16,20 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
+package org.apache.skywalking.oap.server.core.remote.selector;
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
/**
* @author peng-yongsheng
*/
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
-
- private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
- @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+public class HashCodeSelector implements RemoteClientSelector {
+ @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
+ int size = clients.size();
+ int selectIndex = Math.abs(indicator.hashCode()) % size;
+ return clients.get(selectIndex);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
similarity index 70%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
index 7e8b8e1..438cbad 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
@@ -16,14 +16,15 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+package org.apache.skywalking.oap.server.core.remote.selector;
-import java.lang.annotation.*;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public interface RemoteClientSelector {
+ RemoteClient select(List<RemoteClient> clients, Indicator indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
similarity index 59%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
index 54ca9d6..c74e8c3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
@@ -16,31 +16,27 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.remote.selector;
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
/**
* @author peng-yongsheng
*/
-@IndicatorType
-public abstract class AvgIndicator extends Indicator {
+public class RollingSelector implements RemoteClientSelector {
- private long summation;
- private int count;
+ private int index = 0;
- public AvgIndicator(long timeBucket) {
- super(timeBucket);
- }
-
- @Entrance
- public void combine(@SourceFrom long summation, @ConstOne int count) {
- this.summation += summation;
- this.count += count;
- }
+ @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
+ int size = clients.size();
+ index++;
+ int selectIndex = Math.abs(index) % size;
- @Override public void combine(Indicator indicator) {
- AvgIndicator avgIndicator = (AvgIndicator)indicator;
- combine(avgIndicator.summation, avgIndicator.count);
+ if (index == Integer.MAX_VALUE) {
+ index = 0;
+ }
+ return clients.get(selectIndex);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
similarity index 93%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
index b7ac6ed..bd51648 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.core.remote;
+package org.apache.skywalking.oap.server.core.remote.selector;
/**
* @author peng-yongsheng
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/proto/RemoteService.proto
similarity index 61%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/main/proto/RemoteService.proto
index 66bb57d..410b0c3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -16,21 +16,28 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+syntax = "proto3";
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+option java_multiple_files = true;
+option java_package = "org.apache.skywalking.oap.server.core.remote.grpc.proto";
-/**
- * @author peng-yongsheng
- */
-public abstract class Indicator extends StreamData {
-
- @Getter private final long timeBucket;
-
- public Indicator(long timeBucket) {
- this.timeBucket = timeBucket;
+service RemoteService {
+ rpc call (stream RemoteMessage) returns (Empty) {
}
+}
- public abstract void combine(Indicator indicator);
+message RemoteMessage {
+ int32 nextWorkerId = 1;
+ int32 indicatorId = 2;
+ RemoteData remoteData = 3;
}
+
+message RemoteData {
+ repeated string dataStrings = 1;
+ repeated int64 dataLongs = 2;
+ repeated double dataDoubles = 3;
+ repeated int32 dataIntegers = 4;
+}
+
+message Empty {
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
new file mode 100644
index 0000000..ce6b6dd
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgIndicator
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/worker.def b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
new file mode 100644
index 0000000..5afc8db
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgAggregateWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgRemoteWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgPersistentWorker
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
similarity index 71%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
index 66bb57d..4b58eb5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
@@ -16,21 +16,21 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+import org.junit.*;
/**
* @author peng-yongsheng
*/
-public abstract class Indicator extends StreamData {
+public class IndicatorMapperTestCase {
- @Getter private final long timeBucket;
+ @Test
+ public void test() throws IndicatorDefineLoadException {
+ IndicatorMapper mapper = new IndicatorMapper();
+ mapper.load();
- public Indicator(long timeBucket) {
- this.timeBucket = timeBucket;
+ Assert.assertEquals(1, mapper.findIdByClass(TestAvgIndicator.class));
+ Assert.assertEquals(TestAvgIndicator.class, mapper.findClassById(1));
}
-
- public abstract void combine(Indicator indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
similarity index 68%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
index 66bb57d..17d1189 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -16,21 +16,23 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
-public abstract class Indicator extends StreamData {
+public class TestAvgIndicator extends AvgIndicator {
- @Getter private final long timeBucket;
+ @Setter @Getter private int id;
- public Indicator(long timeBucket) {
- this.timeBucket = timeBucket;
+ @Override public RemoteData.Builder serialize() {
+ return null;
}
- public abstract void combine(Indicator indicator);
+ @Override public void deserialize(RemoteData remoteData) {
+ }
}
diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
new file mode 100644
index 0000000..97491ff
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
new file mode 100644
index 0000000..33ebbb1
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/resources/log4j2.xml b/oap-server/server-core/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..6eb5b3f
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<Configuration status="DEBUG">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="DEBUG">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 2416be6..188fae3 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.library.client.grpc;
import io.grpc.*;
+import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.Client;
/**
@@ -26,9 +27,9 @@ import org.apache.skywalking.oap.server.library.client.Client;
*/
public class GRPCClient implements Client {
- private final String host;
+ @Getter private final String host;
- private final int port;
+ @Getter private final int port;
private ManagedChannel channel;
diff --git a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
index 7596fb6..9873303 100644
--- a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
+++ b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
@@ -18,8 +18,7 @@
package org.apache.skywalking.oap.server.library.module;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
/**
* The <code>ModuleProvider</code> is an implementation of a {@link ModuleDefine}.
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
index e9764ce..8893821 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
@@ -35,8 +35,6 @@ public class MeshGRPCHandler extends ServiceMeshMetricServiceGrpc.ServiceMeshMet
if (logger.isDebugEnabled()) {
logger.debug("Received mesh metric: {}", metric);
}
-
-
}
@Override public void onError(Throwable throwable) {