You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/07/30 01:54:39 UTC

[incubator-skywalking] branch 6.0 updated: Feature/oap/remote (#1505)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/6.0 by this push:
     new efe5299  Feature/oap/remote (#1505)
efe5299 is described below

commit efe5299d03536e739c3d8022bf0deb56cbc46088
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Mon Jul 30 09:54:36 2018 +0800

    Feature/oap/remote (#1505)
    
    * Sample operator code.
    
    * Indicator aggregator framework.
    
    * Provide some annotation for OAL.
    
    * Remote module.
    
    * Register service.
    
    * Add apache license header.
    
    * Ignore comments when load definition files.
---
 .../plugin/standalone/StandaloneManagerTest.java   |   7 +-
 .../ClusterModuleZookeeperProviderTestCase.java    |  17 +--
 oap-server/server-core/pom.xml                     |  50 +++++++
 .../skywalking/oap/server/core/CoreModule.java     |  12 ++
 .../oap/server/core/CoreModuleProvider.java        |  51 ++++---
 .../server/core/analysis/DispatcherManager.java    |   5 +-
 .../oap/server/core/analysis/data/StreamData.java  |   4 +-
 .../core/analysis/endpoint/EndpointDispatcher.java |  19 ++-
 ...java => EndpointLatencyAvgAggregateWorker.java} |  16 ++-
 .../endpoint/EndpointLatencyAvgIndicator.java      |  27 +++-
 ...ava => EndpointLatencyAvgPersistentWorker.java} |  12 +-
 ...or.java => EndpointLatencyAvgRemoteWorker.java} |  17 ++-
 .../core/analysis/indicator/AvgIndicator.java      |  16 +--
 .../server/core/analysis/indicator/Indicator.java  |   8 +-
 .../indicator/annotation/IndicatorType.java        |   2 +
 .../IndicatorDefineLoadException.java}             |  12 +-
 .../analysis/indicator/define/IndicatorMapper.java |  82 +++++++++++
 .../AbstractAggregatorWorker.java}                 |  19 +--
 .../AbstractPersistentWorker.java}                 |  16 +--
 .../core/analysis/worker/AbstractRemoteWorker.java |  63 ++++++++
 .../IndicatorType.java => worker/Worker.java}      |  10 +-
 .../define/WorkerDefineLoadException.java}         |  12 +-
 .../core/analysis/worker/define/WorkerMapper.java  | 102 +++++++++++++
 .../oap/server/core/cluster/RemoteInstance.java    |  41 ++----
 .../server/core/receiver/SourceReceiverImpl.java   |   5 +-
 .../remote/{Selector.java => Deserializable.java}  |   6 +-
 .../server/core/remote/RemoteSenderService.java    |  60 ++++++++
 .../server/core/remote/RemoteServiceHandler.java   |  71 +++++++++
 .../RemoteData.java => remote/Serializable.java}   |   8 +-
 .../core/remote/client/GRPCRemoteClient.java       | 158 +++++++++++++++++++++
 .../client/RemoteClient.java}                      |  14 +-
 .../core/remote/client/RemoteClientManager.java    | 126 ++++++++++++++++
 .../core/remote/client/SelfRemoteClient.java}      |  34 +++--
 .../selector/ForeverFirstSelector.java}            |  17 ++-
 .../selector/HashCodeSelector.java}                |  17 +--
 .../selector/RemoteClientSelector.java}            |  11 +-
 .../selector/RollingSelector.java}                 |  32 ++---
 .../core/remote/{ => selector}/Selector.java       |   2 +-
 .../Indicator.java => proto/RemoteService.proto}   |  33 +++--
 .../main/resources/META-INF/defines/indicator.def  |  19 +++
 .../src/main/resources/META-INF/defines/worker.def |  21 +++
 .../indicator/define/IndicatorMapperTestCase.java} |  18 +--
 .../indicator/define/TestAvgIndicator.java}        |  18 +--
 .../test/resources/META-INF/defines/indicator.def  |  19 +++
 .../src/test/resources/META-INF/defines/worker.def |  17 +++
 .../server-core/src/test/resources/log4j2.xml      |  31 ++++
 .../oap/server/library/client/grpc/GRPCClient.java |   5 +-
 .../oap/server/library/module/ModuleProvider.java  |   3 +-
 .../receiver/mesh/provider/MeshGRPCHandler.java    |   2 -
 49 files changed, 1117 insertions(+), 250 deletions(-)

diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
index 2e69130..55671bc 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
@@ -19,15 +19,14 @@
 package org.apache.skywalking.oap.server.cluster.plugin.standalone;
 
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.*;
 
 public class StandaloneManagerTest {
     @Test
     public void test() {
         StandaloneManager standaloneManager = new StandaloneManager();
-        RemoteInstance remote1 = new RemoteInstance();
-        RemoteInstance remote2 = new RemoteInstance();
+        RemoteInstance remote1 = new RemoteInstance("A", 100, true);
+        RemoteInstance remote2 = new RemoteInstance("B", 100, false);
 
         standaloneManager.registerRemote(remote1);
         Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
index bbffa9f..a318481 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
@@ -21,16 +21,9 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
 import java.io.IOException;
 import java.util.List;
 import org.apache.curator.test.TestingServer;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.junit.*;
 
 /**
  * @author peng-yongsheng
@@ -59,9 +52,7 @@ public class ClusterModuleZookeeperProviderTestCase {
         ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
         ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
 
-        RemoteInstance remoteInstance = new RemoteInstance();
-        remoteInstance.setHost("ProviderAHost");
-        remoteInstance.setPort(1000);
+        RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
 
         moduleRegister.registerRemote(remoteInstance);
 
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index 767e1c8..ca6ac7a 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -43,6 +43,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.skywalking</groupId>
+            <artifactId>library-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
             <artifactId>library-server</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -57,4 +62,49 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
+
+    <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.4.1.Final</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>2.4.3</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.5.0</version>
+                <configuration>
+                    <!--
+                      The version of protoc must match protobuf-java. If you don't depend on
+                      protobuf-java directly, you will be transitively depending on the
+                      protobuf-java version that grpc depends on.
+                    -->
+                    <protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
+                    </protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
+                    </pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 6f3de48..b03152f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -19,7 +19,11 @@
 package org.apache.skywalking.oap.server.core;
 
 import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
 import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import org.apache.skywalking.oap.server.core.server.*;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
@@ -38,6 +42,7 @@ public class CoreModule extends ModuleDefine {
         List<Class> classes = new ArrayList<>();
         addServerInterface(classes);
         addReceiverInterface(classes);
+        addInsideService(classes);
 
         return classes.toArray(new Class[] {});
     }
@@ -47,6 +52,13 @@ public class CoreModule extends ModuleDefine {
         classes.add(JettyHandlerRegister.class);
     }
 
+    private void addInsideService(List<Class> classes) {
+        classes.add(IndicatorMapper.class);
+        classes.add(WorkerMapper.class);
+        classes.add(RemoteClientManager.class);
+        classes.add(RemoteSenderService.class);
+    }
+
     private void addReceiverInterface(List<Class> classes) {
         classes.add(SourceReceiver.class);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 1af3873..757c34a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -18,24 +18,18 @@
 
 package org.apache.skywalking.oap.server.core;
 
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiverImpl;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.receiver.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -47,17 +41,21 @@ public class CoreModuleProvider extends ModuleProvider {
     private final CoreModuleConfig moduleConfig;
     private GRPCServer grpcServer;
     private JettyServer jettyServer;
+    private final IndicatorMapper indicatorMapper;
+    private final WorkerMapper workerMapper;
 
     public CoreModuleProvider() {
         super();
         this.moduleConfig = new CoreModuleConfig();
+        this.indicatorMapper = new IndicatorMapper();
+        this.workerMapper = new WorkerMapper(getManager());
     }
 
     @Override public String name() {
         return "default";
     }
 
-    @Override public Class module() {
+    @Override public Class<? extends ModuleDefine> module() {
         return CoreModule.class;
     }
 
@@ -75,11 +73,24 @@ public class CoreModuleProvider extends ModuleProvider {
         this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
         this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
 
-        this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
+        this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl(getManager()));
+
+        this.registerServiceImplementation(IndicatorMapper.class, indicatorMapper);
+        this.registerServiceImplementation(WorkerMapper.class, workerMapper);
+
+        this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager()));
+        this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
     }
 
-    @Override public void start() {
+    @Override public void start() throws ModuleStartException {
+        grpcServer.addHandler(new RemoteServiceHandler(getManager()));
 
+        try {
+            indicatorMapper.load();
+            workerMapper.load();
+        } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
+            throw new ModuleStartException(e.getMessage(), e);
+        }
     }
 
     @Override public void notifyAfterCompleted() throws ModuleStartException {
@@ -90,9 +101,7 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        RemoteInstance gRPCServerInstance = new RemoteInstance();
-        gRPCServerInstance.setHost(moduleConfig.getGRPCHost());
-        gRPCServerInstance.setPort(moduleConfig.getGRPCPort());
+        RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
         this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index de8da4e..9317382 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis;
 import java.util.*;
 import org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher;
 import org.apache.skywalking.oap.server.core.receiver.Scope;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
 /**
@@ -32,9 +33,9 @@ public class DispatcherManager {
 
     private Map<Scope, SourceDispatcher> dispatcherMap;
 
-    public DispatcherManager() {
+    public DispatcherManager(ModuleManager moduleManager) {
         this.dispatcherMap = new HashMap<>();
-        this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
+        this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher(moduleManager));
     }
 
     public SourceDispatcher getDispatcher(Scope scope) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
index 63f149f..57c75d4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
@@ -18,10 +18,12 @@
 
 package org.apache.skywalking.oap.server.core.analysis.data;
 
+import org.apache.skywalking.oap.server.core.remote.*;
+
 /**
  * @author peng-yongsheng
  */
-public abstract class StreamData implements QueueData {
+public abstract class StreamData implements QueueData, Serializable, Deserializable {
 
     private EndOfBatchContext endOfBatchContext;
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
index 152ac5e..bcb3a2c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
@@ -18,18 +18,22 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
 import org.apache.skywalking.oap.server.core.receiver.Endpoint;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
 
-    private final EndpointLatencyAvgAggregator avgAggregator;
+    private final ModuleManager moduleManager;
+    private EndpointLatencyAvgAggregateWorker avgAggregator;
 
-    public EndpointDispatcher() {
-        this.avgAggregator = new EndpointLatencyAvgAggregator();
+    public EndpointDispatcher(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
     }
 
     @Override public void dispatch(Endpoint source) {
@@ -37,7 +41,14 @@ public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
     }
 
     private void avg(Endpoint source) {
-        EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId());
+        if (avgAggregator == null) {
+            WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+            avgAggregator = (EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class);
+        }
+
+        EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator();
+        indicator.setId(source.getId());
+        indicator.setTimeBucket(source.getTimeBucket());
         indicator.combine(source.getLatency(), 1);
         avgAggregator.in(indicator);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
similarity index 64%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
index f409909..62d5134 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
@@ -18,17 +18,25 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
+public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
 
-    private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
+    private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
 
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+    private final EndpointLatencyAvgRemoteWorker remoter;
+
+    public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
+        super(moduleManager);
+        this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
+    }
 
+    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+        remoter.in(data);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index a200aeb..e339afa 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,19 +18,16 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
+import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointLatencyAvgIndicator extends AvgIndicator {
 
-    private final int id;
-
-    public EndpointLatencyAvgIndicator(long timeBucket, int id) {
-        super(timeBucket);
-        this.id = id;
-    }
+    @Setter @Getter private int id;
 
     @Override public int hashCode() {
         int result = 17;
@@ -55,4 +52,22 @@ public class EndpointLatencyAvgIndicator extends AvgIndicator {
 
         return true;
     }
+
+    @Override public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.setDataIntegers(0, getId());
+        remoteBuilder.setDataIntegers(1, getCount());
+
+        remoteBuilder.setDataLongs(0, getTimeBucket());
+        remoteBuilder.setDataLongs(1, getSummation());
+        return remoteBuilder;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+        setId(remoteData.getDataIntegers(0));
+        setCount(remoteData.getDataIntegers(1));
+
+        setTimeBucket(remoteData.getDataLongs(0));
+        setSummation(remoteData.getDataLongs(1));
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
similarity index 70%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
index f409909..e3c5c23 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
@@ -18,17 +18,15 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
-
-    private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker<EndpointLatencyAvgIndicator> {
 
+    public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
+        super(moduleManager);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
similarity index 59%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
index f409909..47f7521 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
@@ -18,17 +18,24 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
+public class EndpointLatencyAvgRemoteWorker extends AbstractRemoteWorker<EndpointLatencyAvgIndicator> {
 
-    private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
+    public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) {
+        super(moduleManager);
+    }
 
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+    @Override public Selector selector() {
+        return Selector.HashCode;
+    }
 
+    @Override public Class nextWorkerClass() {
+        return EndpointLatencyAvgPersistentWorker.class;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index 54ca9d6..da065f3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -18,28 +18,26 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
+import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 
 /**
  * @author peng-yongsheng
  */
-@IndicatorType
+@IndicatorType(selector = Selector.HashCode)
 public abstract class AvgIndicator extends Indicator {
 
-    private long summation;
-    private int count;
-
-    public AvgIndicator(long timeBucket) {
-        super(timeBucket);
-    }
+    @Getter @Setter private long summation;
+    @Getter @Setter private int count;
 
     @Entrance
-    public void combine(@SourceFrom long summation, @ConstOne int count) {
+    public final void combine(@SourceFrom long summation, @ConstOne int count) {
         this.summation += summation;
         this.count += count;
     }
 
-    @Override public void combine(Indicator indicator) {
+    @Override public final void combine(Indicator indicator) {
         AvgIndicator avgIndicator = (AvgIndicator)indicator;
         combine(avgIndicator.summation, avgIndicator.count);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 66bb57d..533379e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
-import lombok.Getter;
+import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
 
 /**
@@ -26,11 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
  */
 public abstract class Indicator extends StreamData {
 
-    @Getter private final long timeBucket;
-
-    public Indicator(long timeBucket) {
-        this.timeBucket = timeBucket;
-    }
+    @Getter @Setter private long timeBucket;
 
     public abstract void combine(Indicator indicator);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index 7e8b8e1..46f0345 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
 
 import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 
 /**
  * @author peng-yongsheng
@@ -26,4 +27,5 @@ import java.lang.annotation.*;
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.SOURCE)
 public @interface IndicatorType {
+    Selector selector();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
similarity index 83%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
index 7e8b8e1..ca0521a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
@@ -16,14 +16,14 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
-
-import java.lang.annotation.*;
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
 
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public class IndicatorDefineLoadException extends Exception {
+
+    public IndicatorDefineLoadException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
new file mode 100644
index 0000000..8513fa2
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import java.io.*;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorMapper implements Service {
+
+    private static final Logger logger = LoggerFactory.getLogger(IndicatorMapper.class);
+
+    private int id = 0;
+    private final Map<Class<Indicator>, Integer> classKeyMapping;
+    private final Map<Integer, Class<Indicator>> idKeyMapping;
+
+    public IndicatorMapper() {
+        this.classKeyMapping = new HashMap<>();
+        this.idKeyMapping = new HashMap<>();
+    }
+
+    @SuppressWarnings(value = "unchecked")
+    public void load() throws IndicatorDefineLoadException {
+        try {
+            List<String> indicatorClasses = new LinkedList<>();
+
+            Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/indicator.def");
+            while (urlEnumeration.hasMoreElements()) {
+                URL definitionFileURL = urlEnumeration.nextElement();
+                logger.info("Load indicator definition file url: {}", definitionFileURL.getPath());
+                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
+                Properties properties = new Properties();
+                properties.load(bufferedReader);
+
+                Enumeration defineItem = properties.propertyNames();
+                while (defineItem.hasMoreElements()) {
+                    String fullNameClass = (String)defineItem.nextElement();
+                    indicatorClasses.add(fullNameClass);
+                }
+            }
+
+            for (String indicatorClassName : indicatorClasses) {
+                Class<Indicator> indicatorClass = (Class<Indicator>)Class.forName(indicatorClassName);
+                id++;
+                classKeyMapping.put(indicatorClass, id);
+                idKeyMapping.put(id, indicatorClass);
+            }
+        } catch (IOException | ClassNotFoundException e) {
+            throw new IndicatorDefineLoadException(e.getMessage(), e);
+        }
+    }
+
+    public int findIdByClass(Class indicatorClass) {
+        return classKeyMapping.get(indicatorClass);
+    }
+
+    public Class<Indicator> findClassById(int id) {
+        return idKeyMapping.get(id);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
similarity index 84%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
index f4e769a..65d68b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
@@ -16,35 +16,36 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis;
+package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractAggregator<INPUT extends Indicator> {
+public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends Worker<INPUT> {
 
-    private static final Logger logger = LoggerFactory.getLogger(AbstractAggregator.class);
+    private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class);
 
     private final DataCarrier<INPUT> dataCarrier;
     private final MergeDataCache<INPUT> mergeDataCache;
     private int messageNum;
 
-    public AbstractAggregator() {
+    public AbstractAggregatorWorker(ModuleManager moduleManager) {
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>(1, 10000);
         this.dataCarrier.consume(new AggregatorConsumer(this), 1);
     }
 
-    public void in(INPUT message) {
-        message.setEndOfBatchContext(new EndOfBatchContext(false));
-        dataCarrier.produce(message);
+    @Override public final void in(INPUT input) {
+        input.setEndOfBatchContext(new EndOfBatchContext(false));
+        dataCarrier.produce(input);
     }
 
     private void onWork(INPUT message) {
@@ -91,9 +92,9 @@ public abstract class AbstractAggregator<INPUT extends Indicator> {
 
     private class AggregatorConsumer implements IConsumer<INPUT> {
 
-        private final AbstractAggregator<INPUT> aggregator;
+        private final AbstractAggregatorWorker<INPUT> aggregator;
 
-        private AggregatorConsumer(AbstractAggregator<INPUT> aggregator) {
+        private AggregatorConsumer(AbstractAggregatorWorker<INPUT> aggregator) {
             this.aggregator = aggregator;
         }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
index 63f149f..bcead67 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
@@ -16,20 +16,20 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class StreamData implements QueueData {
-
-    private EndOfBatchContext endOfBatchContext;
+public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> {
 
-    @Override public final EndOfBatchContext getEndOfBatchContext() {
-        return this.endOfBatchContext;
+    public AbstractPersistentWorker(ModuleManager moduleManager) {
     }
 
-    @Override public final void setEndOfBatchContext(EndOfBatchContext context) {
-        this.endOfBatchContext = context;
+    @Override public final void in(INPUT input) {
+
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
new file mode 100644
index 0000000..e3fb89d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends Worker<INPUT> {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractRemoteWorker.class);
+
+    private final ModuleManager moduleManager;
+    private RemoteSenderService remoteSender;
+    private WorkerMapper workerMapper;
+
+    public AbstractRemoteWorker(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+    }
+
+    @Override public final void in(INPUT input) {
+        if (remoteSender == null) {
+            remoteSender = moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+        }
+        if (workerMapper == null) {
+            workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+        }
+
+        try {
+            int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
+            remoteSender.send(nextWorkerId, input, selector());
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public abstract Class nextWorkerClass();
+
+    public abstract Selector selector();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
similarity index 78%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
index 7e8b8e1..53010ee 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
@@ -16,14 +16,14 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public abstract class Worker<INPUT extends Indicator> {
+
+    public abstract void in(INPUT input);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
similarity index 78%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
index 7e8b8e1..a24a0a7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
@@ -16,14 +16,14 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
-
-import java.lang.annotation.*;
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
 
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public class WorkerDefineLoadException extends Exception {
+
+    public WorkerDefineLoadException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
new file mode 100644
index 0000000..9732430
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
+
+import java.io.*;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.Worker;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class WorkerMapper implements Service {
+
+    private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class);
+
+    private int id = 0;
+    private final ModuleManager moduleManager;
+    private final Map<Class<Worker>, Integer> classKeyMapping;
+    private final Map<Integer, Class<Worker>> idKeyMapping;
+    private final Map<Class<Worker>, Worker> classKeyInstanceMapping;
+    private final Map<Integer, Worker> idKeyInstanceMapping;
+
+    public WorkerMapper(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.classKeyMapping = new HashMap<>();
+        this.idKeyMapping = new HashMap<>();
+        this.classKeyInstanceMapping = new HashMap<>();
+        this.idKeyInstanceMapping = new HashMap<>();
+    }
+
+    @SuppressWarnings(value = "unchecked")
+    public void load() throws WorkerDefineLoadException {
+        try {
+            List<String> workerClasses = new LinkedList<>();
+
+            Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/worker.def");
+            while (urlEnumeration.hasMoreElements()) {
+                URL definitionFileURL = urlEnumeration.nextElement();
+                logger.info("Load worker definition file url: {}", definitionFileURL.getPath());
+                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
+                Properties properties = new Properties();
+                properties.load(bufferedReader);
+
+                Enumeration defineItem = properties.propertyNames();
+                while (defineItem.hasMoreElements()) {
+                    String fullNameClass = (String)defineItem.nextElement();
+                    workerClasses.add(fullNameClass);
+                }
+            }
+
+            for (String workerClassName : workerClasses) {
+                Class<Worker> workerClass = (Class<Worker>)Class.forName(workerClassName);
+                id++;
+                classKeyMapping.put(workerClass, id);
+                idKeyMapping.put(id, workerClass);
+
+                Constructor<Worker> constructor = workerClass.getDeclaredConstructor(ModuleManager.class);
+                Worker worker = constructor.newInstance(moduleManager);
+                classKeyInstanceMapping.put(workerClass, worker);
+                idKeyInstanceMapping.put(id, worker);
+            }
+        } catch (Exception e) {
+            throw new WorkerDefineLoadException(e.getMessage(), e);
+        }
+    }
+
+    public int findIdByClass(Class workerClass) {
+        return classKeyMapping.get(workerClass);
+    }
+
+    public Class<Worker> findClassById(int id) {
+        return idKeyMapping.get(id);
+    }
+
+    public Worker findInstanceByClass(Class workerClass) {
+        return classKeyInstanceMapping.get(workerClass);
+    }
+
+    public Worker findInstanceById(int id) {
+        return idKeyInstanceMapping.get(id);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 53c4ea0..1e85b32 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -19,48 +19,29 @@
 package org.apache.skywalking.oap.server.core.cluster;
 
 import java.util.Objects;
+import lombok.*;
 
 /**
  * @author peng-yongsheng
  */
-public class RemoteInstance {
+public class RemoteInstance implements Comparable<RemoteInstance> {
 
-    private String host;
-    private int port;
-    private boolean self = false;
+    @Getter private final String host;
+    @Getter private final int port;
+    @Getter @Setter private boolean isSelf = false;
 
-    public RemoteInstance() {
-
-    }
-
-    public RemoteInstance(String host, int port, boolean self) {
+    public RemoteInstance(String host, int port, boolean isSelf) {
         this.host = host;
         this.port = port;
-        this.self = self;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
+        this.isSelf = isSelf;
     }
 
-    public boolean isSelf() {
-        return self;
+    @Override public int compareTo(RemoteInstance o) {
+        return toString().compareTo(toString());
     }
 
-    public void setSelf(boolean self) {
-        this.self = self;
+    @Override public String toString() {
+        return host + String.valueOf(port);
     }
 
     @Override public boolean equals(Object o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
index 4d04a31..a6cf211 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.receiver;
 
 import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
@@ -27,8 +28,8 @@ public class SourceReceiverImpl implements SourceReceiver {
 
     private final DispatcherManager dispatcherManager;
 
-    public SourceReceiverImpl() {
-        this.dispatcherManager = new DispatcherManager();
+    public SourceReceiverImpl(ModuleManager moduleManager) {
+        this.dispatcherManager = new DispatcherManager(moduleManager);
     }
 
     @Override public void receive(Source source) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
similarity index 85%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
index b7ac6ed..b2b3a7e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
@@ -18,9 +18,11 @@
 
 package org.apache.skywalking.oap.server.core.remote;
 
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+
 /**
  * @author peng-yongsheng
  */
-public enum Selector {
-    HashCode, Rolling, ForeverFirst
+public interface Deserializable {
+    void deserialize(RemoteData remoteData);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
new file mode 100644
index 0000000..b2ce1c4
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.remote.selector.*;
+import org.apache.skywalking.oap.server.library.module.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteSenderService implements Service {
+
+    private final ModuleManager moduleManager;
+    private final HashCodeSelector hashCodeSelector;
+    private final ForeverFirstSelector foreverFirstSelector;
+    private final RollingSelector rollingSelector;
+
+    public RemoteSenderService(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.hashCodeSelector = new HashCodeSelector();
+        this.foreverFirstSelector = new ForeverFirstSelector();
+        this.rollingSelector = new RollingSelector();
+    }
+
+    public void send(int nextWorkId, Indicator indicator, Selector selector) {
+        RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).getService(RemoteClientManager.class);
+
+        RemoteClient remoteClient;
+        switch (selector) {
+            case HashCode:
+                remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), indicator);
+                remoteClient.push(nextWorkId, indicator);
+            case Rolling:
+                remoteClient = rollingSelector.select(clientManager.getRemoteClient(), indicator);
+                remoteClient.push(nextWorkId, indicator);
+            case ForeverFirst:
+                remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), indicator);
+                remoteClient.push(nextWorkId, indicator);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
new file mode 100644
index 0000000..892e951
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
+
+    private final IndicatorMapper indicatorMapper;
+    private final WorkerMapper workerMapper;
+
+    public RemoteServiceHandler(ModuleManager moduleManager) {
+        this.indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
+        this.workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+    }
+
+    @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
+        return new StreamObserver<RemoteMessage>() {
+            @Override public void onNext(RemoteMessage message) {
+                int indicatorId = message.getIndicatorId();
+                int nextWorkerId = message.getNextWorkerId();
+                RemoteData remoteData = message.getRemoteData();
+
+                Class<Indicator> indicatorClass = indicatorMapper.findClassById(indicatorId);
+                try {
+                    indicatorClass.newInstance().deserialize(remoteData);
+                } catch (InstantiationException | IllegalAccessException e) {
+                    logger.warn(e.getMessage());
+                }
+            }
+
+            @Override public void onError(Throwable throwable) {
+                logger.error(throwable.getMessage(), throwable);
+            }
+
+            @Override public void onCompleted() {
+                responseObserver.onNext(Empty.newBuilder().build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
similarity index 80%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
index fe4f1f5..1a7dbed 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
@@ -16,11 +16,13 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
  * @author peng-yongsheng
  */
-public interface RemoteData {
-    String selectKey();
+public interface Serializable {
+    RemoteData.Builder serialize();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
new file mode 100644
index 0000000..75a5952
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
+
+    private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
+
+    private final GRPCClient client;
+    private final DataCarrier<RemoteMessage> carrier;
+    private final IndicatorMapper indicatorMapper;
+
+    public GRPCRemoteClient(IndicatorMapper indicatorMapper, RemoteInstance remoteInstance, int channelSize,
+        int bufferSize) {
+        this.indicatorMapper = indicatorMapper;
+        this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
+        this.carrier = new DataCarrier<>(channelSize, bufferSize);
+        this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
+        this.carrier.consume(new RemoteMessageConsumer(), 1);
+    }
+
+    @Override public void push(int nextWorkerId, Indicator indicator) {
+        int indicatorId = indicatorMapper.findIdByClass(indicator.getClass());
+        RemoteMessage.Builder builder = RemoteMessage.newBuilder();
+        builder.setNextWorkerId(nextWorkerId);
+        builder.setIndicatorId(indicatorId);
+        builder.setRemoteData(indicator.serialize());
+
+        this.carrier.produce(builder.build());
+    }
+
+    class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
+        @Override public void init() {
+        }
+
+        @Override public void consume(List<RemoteMessage> remoteMessages) {
+            StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+            for (RemoteMessage remoteMessage : remoteMessages) {
+                streamObserver.onNext(remoteMessage);
+            }
+            streamObserver.onCompleted();
+        }
+
+        @Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+
+        @Override public void onExit() {
+        }
+    }
+
+    private StreamObserver<RemoteMessage> createStreamObserver() {
+        RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
+
+        StreamStatus status = new StreamStatus(false);
+        return stub.call(new StreamObserver<Empty>() {
+            @Override public void onNext(Empty empty) {
+            }
+
+            @Override public void onError(Throwable throwable) {
+                logger.error(throwable.getMessage(), throwable);
+            }
+
+            @Override public void onCompleted() {
+                status.finished();
+            }
+        });
+    }
+
+    class StreamStatus {
+
+        private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
+
+        private volatile boolean status;
+
+        StreamStatus(boolean status) {
+            this.status = status;
+        }
+
+        public boolean isFinish() {
+            return status;
+        }
+
+        void finished() {
+            this.status = true;
+        }
+
+        /**
+         * @param maxTimeout max wait time, milliseconds.
+         */
+        public void wait4Finish(long maxTimeout) {
+            long time = 0;
+            while (!status) {
+                if (time > maxTimeout) {
+                    break;
+                }
+                try2Sleep(5);
+                time += 5;
+            }
+        }
+
+        /**
+         * Try to sleep, and ignore the {@link InterruptedException}
+         *
+         * @param millis the length of time to sleep in milliseconds
+         */
+        private void try2Sleep(long millis) {
+            try {
+                Thread.sleep(millis);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override public int compareTo(GRPCRemoteClient o) {
+        return this.client.toString().compareTo(o.client.toString());
+    }
+
+    public String getHost() {
+        return client.getHost();
+    }
+
+    public int getPort() {
+        return client.getPort();
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
similarity index 76%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
index 7e8b8e1..98d23f1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -16,14 +16,18 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+package org.apache.skywalking.oap.server.core.remote.client;
 
-import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public interface RemoteClient {
+
+    String getHost();
+
+    int getPort();
+
+    void push(int nextWorkerId, Indicator indicator);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
new file mode 100644
index 0000000..4d2049b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.*;
+import java.util.concurrent.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteClientManager implements Service {
+
+    private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
+
+    private final ModuleManager moduleManager;
+    private IndicatorMapper indicatorMapper;
+    private ClusterNodesQuery clusterNodesQuery;
+    private final List<RemoteClient> clientsA;
+    private final List<RemoteClient> clientsB;
+    private List<RemoteClient> usingClients;
+
+    public RemoteClientManager(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.clientsA = new LinkedList<>();
+        this.clientsB = new LinkedList<>();
+        this.usingClients = clientsA;
+    }
+
+    public void start() {
+        this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
+        this.indicatorMapper = moduleManager.find(ClusterModule.NAME).getService(IndicatorMapper.class);
+        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 2, TimeUnit.SECONDS);
+    }
+
+    private void refresh() {
+        List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
+        Collections.sort(instanceList);
+
+        if (!compare(instanceList)) {
+            buildNewClients(instanceList);
+        }
+    }
+
+    public List<RemoteClient> getRemoteClient() {
+        return usingClients;
+    }
+
+    private List<RemoteClient> getFreeClients() {
+        if (usingClients.equals(clientsA)) {
+            return clientsB;
+        } else {
+            return clientsA;
+        }
+    }
+
+    private void switchCurrentClients() {
+        if (usingClients.equals(clientsA)) {
+            usingClients = clientsB;
+        } else {
+            usingClients = clientsA;
+        }
+    }
+
+    private void buildNewClients(List<RemoteInstance> remoteInstances) {
+        getFreeClients().clear();
+
+        Map<String, RemoteClient> currentClientsMap = new HashMap<>();
+        this.usingClients.forEach(remoteClient -> {
+            currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient);
+        });
+
+        remoteInstances.forEach(remoteInstance -> {
+            String address = address(remoteInstance.getHost(), remoteInstance.getPort());
+            RemoteClient client;
+            if (currentClientsMap.containsKey(address)) {
+                client = currentClientsMap.get(address);
+            } else {
+                if (remoteInstance.isSelf()) {
+                    client = new SelfRemoteClient(moduleManager, remoteInstance.getHost(), remoteInstance.getPort());
+                } else {
+                    client = new GRPCRemoteClient(indicatorMapper, remoteInstance, 1, 3000);
+                }
+            }
+            getFreeClients().add(client);
+        });
+
+        switchCurrentClients();
+    }
+
+    private boolean compare(List<RemoteInstance> remoteInstances) {
+        if (usingClients.size() == remoteInstances.size()) {
+            for (int i = 0; i < usingClients.size(); i++) {
+                if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private String address(String host, int port) {
+        return host + String.valueOf(port);
+    }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
similarity index 51%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 2416be6..109ff77 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -16,40 +16,38 @@
  *
  */
 
-package org.apache.skywalking.oap.server.library.client.grpc;
+package org.apache.skywalking.oap.server.core.remote.client;
 
-import io.grpc.*;
-import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public class GRPCClient implements Client {
+public class SelfRemoteClient implements RemoteClient {
 
+    private final ModuleManager moduleManager;
     private final String host;
-
     private final int port;
 
-    private ManagedChannel channel;
-
-    public GRPCClient(String host, int port) {
+    public SelfRemoteClient(ModuleManager moduleManager, String host, int port) {
+        this.moduleManager = moduleManager;
         this.host = host;
         this.port = port;
     }
 
-    @Override public void initialize() {
-        channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
-    }
-
-    @Override public void shutdown() {
-        channel.shutdownNow();
+    @Override public String getHost() {
+        return host;
     }
 
-    public ManagedChannel getChannel() {
-        return channel;
+    @Override public int getPort() {
+        return port;
     }
 
-    @Override public String toString() {
-        return host + ":" + port;
+    @Override public void push(int nextWorkerId, Indicator indicator) {
+        WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+        workerMapper.findInstanceById(nextWorkerId).in(indicator);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
similarity index 62%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
index f409909..e28f203 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
@@ -16,19 +16,24 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
+package org.apache.skywalking.oap.server.core.remote.selector;
 
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
 import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
+public class ForeverFirstSelector implements RemoteClientSelector {
 
-    private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+    private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
 
+    @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("clients size: {}", clients.size());
+        }
+        return clients.get(0);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
similarity index 61%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
index f409909..3d256b5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
+package org.apache.skywalking.oap.server.core.remote.selector;
 
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
 
 /**
  * @author peng-yongsheng
  */
-public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
-
-    private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+public class HashCodeSelector implements RemoteClientSelector {
 
+    @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
+        int size = clients.size();
+        int selectIndex = Math.abs(indicator.hashCode()) % size;
+        return clients.get(selectIndex);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
similarity index 70%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
index 7e8b8e1..438cbad 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
@@ -16,14 +16,15 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+package org.apache.skywalking.oap.server.core.remote.selector;
 
-import java.lang.annotation.*;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
 
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
+public interface RemoteClientSelector {
+    RemoteClient select(List<RemoteClient> clients, Indicator indicator);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
similarity index 59%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
index 54ca9d6..c74e8c3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
@@ -16,31 +16,27 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.remote.selector;
 
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
 
 /**
  * @author peng-yongsheng
  */
-@IndicatorType
-public abstract class AvgIndicator extends Indicator {
+public class RollingSelector implements RemoteClientSelector {
 
-    private long summation;
-    private int count;
+    private int index = 0;
 
-    public AvgIndicator(long timeBucket) {
-        super(timeBucket);
-    }
-
-    @Entrance
-    public void combine(@SourceFrom long summation, @ConstOne int count) {
-        this.summation += summation;
-        this.count += count;
-    }
+    @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
+        int size = clients.size();
+        index++;
+        int selectIndex = Math.abs(index) % size;
 
-    @Override public void combine(Indicator indicator) {
-        AvgIndicator avgIndicator = (AvgIndicator)indicator;
-        combine(avgIndicator.summation, avgIndicator.count);
+        if (index == Integer.MAX_VALUE) {
+            index = 0;
+        }
+        return clients.get(selectIndex);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
similarity index 93%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
index b7ac6ed..bd51648 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.remote;
+package org.apache.skywalking.oap.server.core.remote.selector;
 
 /**
  * @author peng-yongsheng
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/proto/RemoteService.proto
similarity index 61%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/main/proto/RemoteService.proto
index 66bb57d..410b0c3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -16,21 +16,28 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+syntax = "proto3";
 
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+option java_multiple_files = true;
+option java_package = "org.apache.skywalking.oap.server.core.remote.grpc.proto";
 
-/**
- * @author peng-yongsheng
- */
-public abstract class Indicator extends StreamData {
-
-    @Getter private final long timeBucket;
-
-    public Indicator(long timeBucket) {
-        this.timeBucket = timeBucket;
+service RemoteService {
+    rpc call (stream RemoteMessage) returns (Empty) {
     }
+}
 
-    public abstract void combine(Indicator indicator);
+message RemoteMessage {
+    int32 nextWorkerId = 1;
+    int32 indicatorId = 2;
+    RemoteData remoteData = 3;
 }
+
+message RemoteData {
+    repeated string dataStrings = 1;
+    repeated int64 dataLongs = 2;
+    repeated double dataDoubles = 3;
+    repeated int32 dataIntegers = 4;
+}
+
+message Empty {
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
new file mode 100644
index 0000000..ce6b6dd
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgIndicator
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/worker.def b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
new file mode 100644
index 0000000..5afc8db
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgAggregateWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgRemoteWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgPersistentWorker
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
similarity index 71%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
index 66bb57d..4b58eb5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
@@ -16,21 +16,21 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
 
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+import org.junit.*;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class Indicator extends StreamData {
+public class IndicatorMapperTestCase {
 
-    @Getter private final long timeBucket;
+    @Test
+    public void test() throws IndicatorDefineLoadException {
+        IndicatorMapper mapper = new IndicatorMapper();
+        mapper.load();
 
-    public Indicator(long timeBucket) {
-        this.timeBucket = timeBucket;
+        Assert.assertEquals(1, mapper.findIdByClass(TestAvgIndicator.class));
+        Assert.assertEquals(TestAvgIndicator.class, mapper.findClassById(1));
     }
-
-    public abstract void combine(Indicator indicator);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
similarity index 68%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
index 66bb57d..17d1189 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -16,21 +16,23 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
 
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class Indicator extends StreamData {
+public class TestAvgIndicator extends AvgIndicator {
 
-    @Getter private final long timeBucket;
+    @Setter @Getter private int id;
 
-    public Indicator(long timeBucket) {
-        this.timeBucket = timeBucket;
+    @Override public RemoteData.Builder serialize() {
+        return null;
     }
 
-    public abstract void combine(Indicator indicator);
+    @Override public void deserialize(RemoteData remoteData) {
+    }
 }
diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
new file mode 100644
index 0000000..97491ff
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
new file mode 100644
index 0000000..33ebbb1
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/resources/log4j2.xml b/oap-server/server-core/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..6eb5b3f
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<Configuration status="DEBUG">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="DEBUG">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 2416be6..188fae3 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.library.client.grpc;
 
 import io.grpc.*;
+import lombok.Getter;
 import org.apache.skywalking.oap.server.library.client.Client;
 
 /**
@@ -26,9 +27,9 @@ import org.apache.skywalking.oap.server.library.client.Client;
  */
 public class GRPCClient implements Client {
 
-    private final String host;
+    @Getter private final String host;
 
-    private final int port;
+    @Getter private final int port;
 
     private ManagedChannel channel;
 
diff --git a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
index 7596fb6..9873303 100644
--- a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
+++ b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
@@ -18,8 +18,7 @@
 
 package org.apache.skywalking.oap.server.library.module;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 /**
  * The <code>ModuleProvider</code> is an implementation of a {@link ModuleDefine}.
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
index e9764ce..8893821 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
@@ -35,8 +35,6 @@ public class MeshGRPCHandler extends ServiceMeshMetricServiceGrpc.ServiceMeshMet
                 if (logger.isDebugEnabled()) {
                     logger.debug("Received mesh metric: {}", metric);
                 }
-
-
             }
 
             @Override public void onError(Throwable throwable) {