You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/11/22 12:50:28 UTC

[GitHub] peng-yongsheng closed pull request #1946: gRPC client usage improve

peng-yongsheng closed pull request #1946: gRPC client usage improve
URL: https://github.com/apache/incubator-skywalking/pull/1946
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 8524142d4..0422ccaf3 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -19,17 +19,12 @@
 package org.apache.skywalking.apm.agent.core.remote;
 
 import io.grpc.Channel;
-import java.util.List;
-
 import io.grpc.stub.StreamObserver;
-import org.apache.skywalking.apm.agent.core.boot.BootService;
-import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
-import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
-import org.apache.skywalking.apm.agent.core.context.TracingContext;
-import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.boot.*;
+import org.apache.skywalking.apm.agent.core.context.*;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
-import org.apache.skywalking.apm.agent.core.logging.api.ILog;
-import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.logging.api.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -37,8 +32,7 @@
 import org.apache.skywalking.apm.network.language.agent.*;
 import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
 
-import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
-import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.*;
 import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
 
 /**
@@ -111,18 +105,18 @@ public void onCompleted() {
                 }
             });
 
-            for (TraceSegment segment : data) {
-                try {
+            try {
+                for (TraceSegment segment : data) {
                     UpstreamSegment upstreamSegment = segment.transform();
                     upstreamSegmentStreamObserver.onNext(upstreamSegment);
-                } catch (Throwable t) {
-                    logger.error(t, "Transform and send UpstreamSegment to collector fail.");
                 }
-            }
-            upstreamSegmentStreamObserver.onCompleted();
+                upstreamSegmentStreamObserver.onCompleted();
 
-            status.wait4Finish();
-            segmentUplinkedCounter += data.size();
+                status.wait4Finish();
+                segmentUplinkedCounter += data.size();
+            } catch (Throwable t) {
+                logger.error(t, "Transform and send UpstreamSegment to collector fail.");
+            }
         } else {
             segmentAbandonedCounter += data.size();
         }
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index d5ce3c74e..0f08ed003 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>apm</artifactId>
         <groupId>org.apache.skywalking</groupId>
@@ -237,6 +238,24 @@
                 <artifactId>grpc-testing</artifactId>
                 <version>${grpc.version}</version>
                 <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.mockito</groupId>
+                        <artifactId>mockito-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>junit</groupId>
+                        <artifactId>junit</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.grpc</groupId>
+                        <artifactId>grpc-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.grpc</groupId>
+                        <artifactId>grpc-stub</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 5c3e72f7e..8fdc5f863 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -19,25 +19,14 @@
 package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import com.google.common.util.concurrent.*;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.slf4j.*;
 
 /**
  * Read collector pod info from api-server of kubernetes, then using all containerIp list to
@@ -63,7 +52,7 @@
     }
 
     @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
-        this.port = remoteInstance.getPort();
+        this.port = remoteInstance.getAddress().getPort();
         submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
             .setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
     }
@@ -99,7 +88,7 @@ private void generateRemoteNodes() {
             switch (event.getType()) {
                 case "ADDED":
                 case "MODIFIED":
-                    cache.put(event.getUid(), new RemoteInstance(event.getHost(), port, event.getUid().equals(this.uid)));
+                    cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
                     break;
                 case "DELETED":
                     cache.remove(event.getUid());
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 35c57f8bd..4fd26996f 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.junit.Test;
 
 import static org.hamcrest.core.Is.is;
@@ -29,44 +30,43 @@
 
     private KubernetesCoordinator coordinator;
 
-
     @Test
     public void assertAdded() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
 
     @Test
     public void assertModified() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.3"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
     }
 
     @Test
     public void assertDeleted() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(1));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
 
     @Test
     public void assertError() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
 }
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
index 6cba94f1a..ef30870be 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
@@ -18,11 +18,8 @@
 
 package org.apache.skywalking.oap.server.cluster.plugin.standalone;
 
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
 
 /**
  * A cluster manager simulator. Work in memory only. Also return the current instance.
@@ -35,7 +32,7 @@
 
     @Override public void registerRemote(RemoteInstance remoteInstance) {
         this.remoteInstance = remoteInstance;
-        this.remoteInstance.setSelf(true);
+        this.remoteInstance.getAddress().setSelf(true);
     }
 
     @Override
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
index 55671bce0..77ede74c3 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
@@ -19,14 +19,15 @@
 package org.apache.skywalking.oap.server.cluster.plugin.standalone;
 
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.junit.*;
 
 public class StandaloneManagerTest {
     @Test
     public void test() {
         StandaloneManager standaloneManager = new StandaloneManager();
-        RemoteInstance remote1 = new RemoteInstance("A", 100, true);
-        RemoteInstance remote2 = new RemoteInstance("B", 100, false);
+        RemoteInstance remote1 = new RemoteInstance(new Address("A", 100, true));
+        RemoteInstance remote2 = new RemoteInstance(new Address("B", 100, false));
 
         standaloneManager.registerRemote(remote1);
         Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
index 07f53a662..826bdbb4a 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
@@ -21,6 +21,7 @@
 import java.util.*;
 import org.apache.curator.x.discovery.*;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.slf4j.*;
 
 /**
@@ -31,7 +32,7 @@
 
     private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
     private volatile ServiceCache<RemoteInstance> serviceCache;
-    private volatile RemoteInstance selfInstance;
+    private volatile Address selfAddress;
 
     ZookeeperCoordinator(ServiceDiscovery<RemoteInstance> serviceDiscovery) {
         this.serviceDiscovery = serviceDiscovery;
@@ -44,8 +45,8 @@
             ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
                 .name(remoteNamePath)
                 .id(UUID.randomUUID().toString())
-                .address(remoteInstance.getHost())
-                .port(remoteInstance.getPort())
+                .address(remoteInstance.getAddress().getHost())
+                .port(remoteInstance.getAddress().getPort())
                 .payload(remoteInstance)
                 .build();
 
@@ -57,7 +58,7 @@
 
             serviceCache.start();
 
-            this.selfInstance = remoteInstance;
+            this.selfAddress = remoteInstance.getAddress();
         } catch (Exception e) {
             throw new ServiceRegisterException(e.getMessage());
         }
@@ -70,10 +71,10 @@
 
             serviceInstances.forEach(serviceInstance -> {
                 RemoteInstance instance = serviceInstance.getPayload();
-                if (instance.equals(selfInstance)) {
-                    instance.setSelf(true);
+                if (instance.getAddress().equals(selfAddress)) {
+                    instance.getAddress().setSelf(true);
                 } else {
-                    instance.setSelf(false);
+                    instance.getAddress().setSelf(false);
                 }
                 remoteInstanceDetails.add(instance);
             });
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
index a318481ed..c8f64461e 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import org.apache.curator.test.TestingServer;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.junit.*;
 
@@ -52,7 +53,7 @@ public void testStart() throws ServiceNotProvidedException, ModuleStartException
         ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
         ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
 
-        RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
+        RemoteInstance remoteInstance = new RemoteInstance(new Address("ProviderAHost", 1000, true));
 
         moduleRegister.registerRemote(remoteInstance);
 
@@ -63,8 +64,8 @@ public void testStart() throws ServiceNotProvidedException, ModuleStartException
                 continue;
             }
             Assert.assertEquals(1, detailsList.size());
-            Assert.assertEquals("ProviderAHost", detailsList.get(0).getHost());
-            Assert.assertEquals(1000, detailsList.get(0).getPort());
+            Assert.assertEquals("ProviderAHost", detailsList.get(0).getAddress().getHost());
+            Assert.assertEquals(1000, detailsList.get(0).getAddress().getPort());
         }
 
     }
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index 9669ca019..37a8f6846 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>oap-server</artifactId>
         <groupId>org.apache.skywalking</groupId>
@@ -33,6 +34,11 @@
             <groupId>org.yaml</groupId>
             <artifactId>snakeyaml</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.skywalking</groupId>
             <artifactId>library-module</artifactId>
@@ -63,6 +69,12 @@
             <artifactId>apm-network</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>server-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index b1181dae7..f0e908499 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -22,55 +22,26 @@
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
 import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
-import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
-import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
-import org.apache.skywalking.oap.server.core.query.MetricQueryService;
-import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
-import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.query.*;
 import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
-import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
-import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
-import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
 import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
 import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
 import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -186,7 +157,7 @@ public CoreModuleProvider() {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
+        RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
         this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
 
         PersistenceTimer.INSTANCE.start(getManager());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
index 2a4ac3ae6..1d7a53c05 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
@@ -69,11 +69,11 @@ public void finishWriting() {
     }
 
     @Override public boolean containsKey(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("None merge data collection not support containsKey operation.");
+        throw new UnsupportedOperationException("Close merge data collection not support containsKey operation.");
     }
 
     @Override public STORAGE_DATA get(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("None merge data collection not support get operation.");
+        throw new UnsupportedOperationException("Close merge data collection not support get operation.");
     }
 
     @Override public void put(STORAGE_DATA value) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 7b0d48f0f..44a4855fd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -18,44 +18,26 @@
 
 package org.apache.skywalking.oap.server.core.cluster;
 
-import java.util.Objects;
-import lombok.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 
 /**
  * @author peng-yongsheng
  */
+@Getter
 public class RemoteInstance implements Comparable<RemoteInstance> {
 
-    @Getter private final String host;
-    @Getter private final int port;
-    @Getter @Setter private boolean isSelf = false;
+    private final Address address;
 
-    public RemoteInstance(String host, int port, boolean isSelf) {
-        this.host = host;
-        this.port = port;
-        this.isSelf = isSelf;
-    }
-
-    @Override public int compareTo(RemoteInstance o) {
-        return toString().compareTo(o.toString());
+    public RemoteInstance(Address address) {
+        this.address = address;
     }
 
     @Override public String toString() {
-        return host + ":" + String.valueOf(port);
+        return address.toString();
     }
 
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        RemoteInstance instance = (RemoteInstance)o;
-        return port == instance.port &&
-            Objects.equals(host, instance.host);
-    }
-
-    @Override public int hashCode() {
-
-        return Objects.hash(host, port);
+    @Override public int compareTo(RemoteInstance o) {
+        return this.address.compareTo(o.getAddress());
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
index cccb7bc9d..cf833442a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
@@ -103,7 +103,7 @@ private IServiceInstanceInventoryRegister getServiceInstanceInventoryRegister()
 
             InventoryProcess.INSTANCE.in(networkAddress);
         } else {
-            logger.warn("Network address {} heartbeat, but not found in storage.");
+            logger.warn("Network getAddress {} heartbeat, but not found in storage.");
         }
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
index cf57cb760..9a0dc7602 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
@@ -81,7 +81,7 @@ private ServiceInstanceInventoryCache getServiceInstanceInventoryCache() {
 
     @Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
         if (logger.isDebugEnabled()) {
-            logger.debug("get or create service instance by address id, service id: {}, address id: {}, registerTime: {}", serviceId, addressId, registerTime);
+            logger.debug("get or create service instance by getAddress id, service id: {}, getAddress id: {}, registerTime: {}", serviceId, addressId, registerTime);
         }
 
         int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, addressId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 92b2a872b..de7161065 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -25,27 +25,36 @@
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
 import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.slf4j.*;
 
 /**
+ * This class is Server-side streaming RPC implementation. It's a common service for OAP servers
+ * to receive message from each others.
+ * The stream data id is used to find the object to deserialize message.
+ * The next worker id is used to find the worker to process message.
+ *
  * @author peng-yongsheng
  */
 public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
 
     private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
 
-    private final ModuleManager moduleManager;
+    private final ModuleDefineHolder moduleDefineHolder;
     private StreamDataClassGetter streamDataClassGetter;
 
-    public RemoteServiceHandler(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
+        this.moduleDefineHolder = moduleDefineHolder;
     }
 
     @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
         if (Objects.isNull(streamDataClassGetter)) {
-            streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+            synchronized (RemoteServiceHandler.class) {
+                if (Objects.isNull(streamDataClassGetter)) {
+                    streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+                }
+            }
         }
 
         return new StreamObserver<RemoteMessage>() {
@@ -59,8 +68,8 @@ public RemoteServiceHandler(ModuleManager moduleManager) {
                     StreamData streamData = streamDataClass.newInstance();
                     streamData.deserialize(remoteData);
                     WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
-                } catch (InstantiationException | IllegalAccessException e) {
-                    logger.warn(e.getMessage());
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
                 }
             }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
new file mode 100644
index 000000000..4e5d0ce53
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+
+/**
+ * @author peng-yongsheng
+ */
+@Getter
+public class Address implements Comparable<Address> {
+    private final String host;
+    private final int port;
+    @Setter private boolean isSelf;
+
+    public Address(String host, int port, boolean isSelf) {
+        this.host = host;
+        this.port = port;
+        this.isSelf = isSelf;
+    }
+
+    @Override public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        Address address = (Address)obj;
+        return host.equals(address.host) && port == address.port;
+    }
+
+    @Override public String toString() {
+        return host + Const.ID_SPLIT + port;
+    }
+
+    @Override public int compareTo(Address o) {
+        return this.toString().compareTo(o.toString());
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index e367f4ddb..b061ca69c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
+import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
@@ -32,27 +32,82 @@
 import org.slf4j.*;
 
 /**
+ * This is a wrapper of the gRPC client for sending message to each other OAP server.
+ * It contains a block queue to buffering the message and sending the message by batch.
+ *
  * @author peng-yongsheng
  */
-public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
+public class GRPCRemoteClient implements RemoteClient {
 
     private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
 
-    private final GRPCClient client;
-    private final DataCarrier<RemoteMessage> carrier;
+    private final int channelSize;
+    private final int bufferSize;
+    private final Address address;
     private final StreamDataClassGetter streamDataClassGetter;
     private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
+    private GRPCClient client;
+    private DataCarrier<RemoteMessage> carrier;
+    private boolean isConnect;
 
-    public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
+    public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
         int bufferSize) {
         this.streamDataClassGetter = streamDataClassGetter;
-        this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
-        this.client.initialize();
-        this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
-        this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
-        this.carrier.consume(new RemoteMessageConsumer(), 1);
+        this.address = address;
+        this.channelSize = channelSize;
+        this.bufferSize = bufferSize;
+    }
+
+    @Override public void connect() {
+        if (!isConnect) {
+            this.getClient().connect();
+            this.getDataCarrier().consume(new RemoteMessageConsumer(), 1);
+            this.isConnect = true;
+        }
+    }
+
+    /**
+     * Get channel state by the true value of request connection.
+     *
+     * @return a channel when the state to be ready
+     */
+    ManagedChannel getChannel() {
+        return getClient().getChannel();
     }
 
+    GRPCClient getClient() {
+        if (Objects.isNull(client)) {
+            synchronized (GRPCRemoteClient.class) {
+                if (Objects.isNull(client)) {
+                    this.client = new GRPCClient(address.getHost(), address.getPort());
+                }
+            }
+        }
+        return client;
+    }
+
+    RemoteServiceGrpc.RemoteServiceStub getStub() {
+        return RemoteServiceGrpc.newStub(getChannel());
+    }
+
+    DataCarrier<RemoteMessage> getDataCarrier() {
+        if (Objects.isNull(this.carrier)) {
+            synchronized (GRPCRemoteClient.class) {
+                if (Objects.isNull(this.carrier)) {
+                    this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
+                    this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
+                }
+            }
+        }
+        return this.carrier;
+    }
+
+    /**
+     * Push stream data which need to send to another OAP server.
+     *
+     * @param nextWorkerId the id of a worker which will process this stream data.
+     * @param streamData the entity contains the values.
+     */
     @Override public void push(int nextWorkerId, StreamData streamData) {
         int streamDataId = streamDataClassGetter.findIdByClass(streamData.getClass());
         RemoteMessage.Builder builder = RemoteMessage.newBuilder();
@@ -60,7 +115,7 @@ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInsta
         builder.setStreamDataId(streamDataId);
         builder.setRemoteData(streamData.serialize());
 
-        this.carrier.produce(builder.build());
+        this.getDataCarrier().produce(builder.build());
     }
 
     class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@@ -68,12 +123,15 @@ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInsta
         }
 
         @Override public void consume(List<RemoteMessage> remoteMessages) {
-            StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
-
-            for (RemoteMessage remoteMessage : remoteMessages) {
-                streamObserver.onNext(remoteMessage);
+            try {
+                StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+                for (RemoteMessage remoteMessage : remoteMessages) {
+                    streamObserver.onNext(remoteMessage);
+                }
+                streamObserver.onCompleted();
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
             }
-            streamObserver.onCompleted();
         }
 
         @Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
@@ -84,9 +142,14 @@ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInsta
         }
     }
 
+    /**
+     * Create a gRPC stream observer to sending stream data, one stream observer
+     * could send multiple stream data by a single consume.
+     * The max number of concurrency allowed at the same time is 10.
+     *
+     * @return stream observer
+     */
     private StreamObserver<RemoteMessage> createStreamObserver() {
-        RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
-
         int sleepTotalMillis = 0;
         int sleepMillis = 10;
         while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
@@ -105,7 +168,7 @@ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInsta
             }
         }
 
-        return stub.call(new StreamObserver<Empty>() {
+        return getStub().call(new StreamObserver<Empty>() {
             @Override public void onNext(Empty empty) {
             }
 
@@ -120,15 +183,20 @@ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInsta
         });
     }
 
-    @Override public int compareTo(GRPCRemoteClient o) {
-        return this.client.toString().compareTo(o.client.toString());
+    @Override public void close() {
+        if (Objects.nonNull(this.carrier)) {
+            this.carrier.shutdownConsumers();
+        }
+        if (Objects.nonNull(this.client)) {
+            this.client.shutdown();
+        }
     }
 
-    public String getHost() {
-        return client.getHost();
+    @Override public Address getAddress() {
+        return address;
     }
 
-    public int getPort() {
-        return client.getPort();
+    @Override public int compareTo(RemoteClient o) {
+        return address.compareTo(o.getAddress());
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
index 8172a28f8..ccc216fad 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -23,11 +23,13 @@
 /**
  * @author peng-yongsheng
  */
-public interface RemoteClient {
+public interface RemoteClient extends Comparable<RemoteClient> {
 
-    String getHost();
+    Address getAddress();
 
-    int getPort();
+    void connect();
+
+    void close();
 
     void push(int nextWorkerId, StreamData streamData);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 129b51761..7d78be0bf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -27,37 +27,61 @@
 import org.slf4j.*;
 
 /**
+ * This class manages the connections between OAP servers. There is a task schedule that will
+ * automatically query a server list from the cluster module. Such as Zookeeper cluster module
+ * or Kubernetes cluster module.
+ *
  * @author peng-yongsheng
  */
 public class RemoteClientManager implements Service {
 
     private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
 
-    private final ModuleManager moduleManager;
+    private final ModuleDefineHolder moduleDefineHolder;
     private StreamDataClassGetter streamDataClassGetter;
     private ClusterNodesQuery clusterNodesQuery;
     private final List<RemoteClient> clientsA;
     private final List<RemoteClient> clientsB;
     private volatile List<RemoteClient> usingClients;
 
-    public RemoteClientManager(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
+        this.moduleDefineHolder = moduleDefineHolder;
         this.clientsA = new LinkedList<>();
         this.clientsB = new LinkedList<>();
         this.usingClients = clientsA;
     }
 
     public void start() {
-        this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
-        this.streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
-        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 5, 5, TimeUnit.SECONDS);
+        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 5, TimeUnit.SECONDS);
     }
 
-    private void refresh() {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Refresh remote nodes collection.");
-        }
+    /**
+     * Query OAP server list from the cluster module and create a new connection
+     * for the new node. Make the OAP server orderly because of each of the server
+     * will send stream data to each other by hash code.
+     */
+    void refresh() {
         try {
+            if (Objects.isNull(clusterNodesQuery)) {
+                synchronized (RemoteClientManager.class) {
+                    if (Objects.isNull(clusterNodesQuery)) {
+                        this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
+                    }
+                }
+            }
+
+            if (Objects.isNull(streamDataClassGetter)) {
+                synchronized (RemoteClientManager.class) {
+                    if (Objects.isNull(streamDataClassGetter)) {
+                        this.streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+                    }
+                }
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Refresh remote nodes collection.");
+            }
+
             List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
             Collections.sort(instanceList);
 
@@ -66,12 +90,11 @@ private void refresh() {
             }
 
             if (!compare(instanceList)) {
-                buildNewClients(instanceList);
+                reBuildRemoteClients(instanceList);
             }
         } catch (Throwable t) {
             logger.error(t.getMessage(), t);
         }
-
     }
 
     public List<RemoteClient> getRemoteClient() {
@@ -94,34 +117,68 @@ private void switchCurrentClients() {
         }
     }
 
-    private void buildNewClients(List<RemoteInstance> remoteInstances) {
+    /**
+     * Compare clients between exist clients and remote instance collection. Move
+     * the clients into new client collection which are alive to avoid create a
+     * new channel. Shutdown the clients which could not find in cluster config.
+     *
+     * Create a gRPC client for remote instance except for self-instance.
+     *
+     * @param remoteInstances Remote instance collection by query cluster config.
+     */
+    private synchronized void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
         getFreeClients().clear();
 
-        Map<String, RemoteClient> currentClientsMap = new HashMap<>();
-        this.usingClients.forEach(remoteClient -> currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient));
+        Map<Address, RemoteClient> remoteClients = new HashMap<>();
+        getRemoteClient().forEach(client -> remoteClients.put(client.getAddress(), client));
+
+        Map<Address, Action> tempRemoteClients = new HashMap<>();
+        getRemoteClient().forEach(client -> tempRemoteClients.put(client.getAddress(), Action.Close));
 
         remoteInstances.forEach(remoteInstance -> {
-            String address = address(remoteInstance.getHost(), remoteInstance.getPort());
-            RemoteClient client;
-            if (currentClientsMap.containsKey(address)) {
-                client = currentClientsMap.get(address);
+            if (tempRemoteClients.containsKey(remoteInstance.getAddress())) {
+                tempRemoteClients.put(remoteInstance.getAddress(), Action.Leave);
             } else {
-                if (remoteInstance.isSelf()) {
-                    client = new SelfRemoteClient(remoteInstance.getHost(), remoteInstance.getPort());
-                } else {
-                    client = new GRPCRemoteClient(streamDataClassGetter, remoteInstance, 1, 3000);
-                }
+                tempRemoteClients.put(remoteInstance.getAddress(), Action.Create);
+            }
+        });
+
+        tempRemoteClients.forEach((address, action) -> {
+            switch (action) {
+                case Leave:
+                    if (remoteClients.containsKey(address)) {
+                        getFreeClients().add(remoteClients.get(address));
+                    }
+                    break;
+                case Create:
+                    if (address.isSelf()) {
+                        RemoteClient client = new SelfRemoteClient(address);
+                        getFreeClients().add(client);
+                    } else {
+                        RemoteClient client = new GRPCRemoteClient(streamDataClassGetter, address, 1, 3000);
+                        client.connect();
+                        getFreeClients().add(client);
+                    }
+                    break;
             }
-            getFreeClients().add(client);
         });
 
+        Collections.sort(getFreeClients());
         switchCurrentClients();
+
+        tempRemoteClients.forEach((address, action) -> {
+            if (Action.Close.equals(action) && remoteClients.containsKey(address)) {
+                remoteClients.get(address).close();
+            }
+        });
+
+        getFreeClients().clear();
     }
 
     private boolean compare(List<RemoteInstance> remoteInstances) {
         if (usingClients.size() == remoteInstances.size()) {
             for (int i = 0; i < usingClients.size(); i++) {
-                if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
+                if (!usingClients.get(i).getAddress().equals(remoteInstances.get(i).getAddress())) {
                     return false;
                 }
             }
@@ -131,7 +188,7 @@ private boolean compare(List<RemoteInstance> remoteInstances) {
         }
     }
 
-    private String address(String host, int port) {
-        return host + String.valueOf(port);
+    enum Action {
+        Close, Leave, Create
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 9b4f6bd1b..ab25f5cde 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
 
@@ -26,23 +27,28 @@
  */
 public class SelfRemoteClient implements RemoteClient {
 
-    private final String host;
-    private final int port;
+    private final Address address;
 
-    public SelfRemoteClient(String host, int port) {
-        this.host = host;
-        this.port = port;
+    public SelfRemoteClient(Address address) {
+        this.address = address;
     }
 
-    @Override public String getHost() {
-        return host;
+    @Override public Address getAddress() {
+        return address;
     }
 
-    @Override public int getPort() {
-        return port;
+    @Override public void connect() {
+    }
+
+    @Override public void close() {
+        throw new UnexpectedException("Self remote client invoked to close.");
     }
 
     @Override public void push(int nextWorkerId, StreamData streamData) {
         WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
     }
+
+    @Override public int compareTo(RemoteClient o) {
+        return address.compareTo(o.getAddress());
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 861dd7bbe..ba3de7a85 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -20,29 +20,20 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import lombok.Setter;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.cluster.*;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.Downsampling;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -67,8 +58,8 @@ public void start(ModuleManager moduleManager) {
 
     private void delete() {
         List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
-        if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).isSelf()) {
-            logger.info("The selected first address is {}. Skip.", remoteInstances.get(0).toString());
+        if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
+            logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
             return;
         }
 
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
new file mode 100644
index 000000000..b2e1f0354
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import io.grpc.inprocess.*;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteServiceHandlerTestCase {
+
+    @Rule
+    public final GrpcCleanupRule gRPCCleanup = new GrpcCleanupRule();
+
+    @Test
+    public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException {
+        final int streamDataClassId = 1;
+        final int testWorkerId = 1;
+
+        ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, moduleDefine);
+
+        StreamDataClassGetter classGetter = mock(StreamDataClassGetter.class);
+        Class<?> dataClass = TestRemoteData.class;
+        when(classGetter.findClassById(streamDataClassId)).thenReturn((Class<StreamData>)dataClass);
+
+        moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
+
+        WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker());
+
+        String serverName = InProcessServerBuilder.generateName();
+
+        gRPCCleanup.register(InProcessServerBuilder
+            .forName(serverName).directExecutor().addService(new RemoteServiceHandler(moduleManager)).build().start());
+
+        RemoteServiceGrpc.RemoteServiceStub remoteServiceStub = RemoteServiceGrpc.newStub(
+            gRPCCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
+
+        StreamObserver<RemoteMessage> streamObserver = remoteServiceStub.call(new StreamObserver<Empty>() {
+            @Override public void onNext(Empty empty) {
+
+            }
+
+            @Override public void onError(Throwable throwable) {
+
+            }
+
+            @Override public void onCompleted() {
+
+            }
+        });
+
+        RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
+        remoteMessage.setStreamDataId(streamDataClassId);
+        remoteMessage.setNextWorkerId(testWorkerId);
+
+        RemoteData.Builder remoteData = RemoteData.newBuilder();
+        remoteData.addDataStrings("test1");
+        remoteData.addDataStrings("test2");
+
+        remoteData.addDataLongs(10);
+        remoteData.addDataLongs(20);
+        remoteMessage.setRemoteData(remoteData);
+
+        streamObserver.onNext(remoteMessage.build());
+        streamObserver.onCompleted();
+    }
+
+    static class TestRemoteData extends StreamData {
+
+        private String str1;
+        private String str2;
+        private long long1;
+        private long long2;
+
+        @Override public int remoteHashCode() {
+            return 10;
+        }
+
+        @Override public void deserialize(RemoteData remoteData) {
+            str1 = remoteData.getDataStrings(0);
+            str2 = remoteData.getDataStrings(1);
+            long1 = remoteData.getDataLongs(0);
+            long2 = remoteData.getDataLongs(1);
+
+            Assert.assertEquals("test1", str1);
+            Assert.assertEquals("test2", str2);
+            Assert.assertEquals(10, long1);
+            Assert.assertEquals(20, long2);
+        }
+
+        @Override public RemoteData.Builder serialize() {
+            return null;
+        }
+    }
+
+    static class TestWorker extends AbstractWorker {
+
+        public TestWorker() {
+            super(1);
+        }
+
+        @Override public void in(Object o) {
+            TestRemoteData data = (TestRemoteData)o;
+
+            Assert.assertEquals("test1", data.str1);
+            Assert.assertEquals("test2", data.str2);
+            Assert.assertEquals(10, data.long1);
+            Assert.assertEquals(20, data.long2);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
new file mode 100644
index 000000000..41953fb6d
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.junit.Assert;
+
+import static org.mockito.Mockito.spy;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientRealClient {
+
+    public static void main(String[] args) throws InterruptedException {
+        Address address = new Address("localhost", 10000, false);
+        GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(new TestClassGetter(), address, 1, 10));
+        remoteClient.connect();
+
+        for (int i = 0; i < 10000; i++) {
+            remoteClient.push(1, new TestStreamData());
+            TimeUnit.SECONDS.sleep(1);
+        }
+
+        TimeUnit.MINUTES.sleep(10);
+    }
+
+    public static class TestClassGetter implements StreamDataClassGetter {
+
+        @Override public int findIdByClass(Class streamDataClass) {
+            return 1;
+        }
+
+        @Override public Class<StreamData> findClassById(int id) {
+            Class<?> clazz = TestStreamData.class;
+            return (Class<StreamData>)clazz;
+        }
+    }
+
+    public static class TestStreamData extends StreamData {
+
+        private long value;
+
+        @Override public int remoteHashCode() {
+            return 0;
+        }
+
+        @Override public void deserialize(RemoteData remoteData) {
+            this.value = remoteData.getDataLongs(0);
+        }
+
+        @Override public RemoteData.Builder serialize() {
+            RemoteData.Builder builder = RemoteData.newBuilder();
+            builder.addDataLongs(987);
+            return builder;
+        }
+    }
+
+    static class TestWorker extends AbstractWorker {
+
+        public TestWorker(int workerId) {
+            super(workerId);
+        }
+
+        @Override public void in(Object o) {
+            TestStreamData streamData = (TestStreamData)o;
+            Assert.assertEquals(987, streamData.value);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
new file mode 100644
index 000000000..ae5809cfd
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.library.server.ServerException;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
+import org.apache.skywalking.oap.server.testing.module.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientRealServer {
+
+    public static void main(String[] args) throws ServerException, InterruptedException {
+        ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, moduleDefine);
+
+        moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, new GRPCRemoteClientRealClient.TestClassGetter());
+
+        GRPCServer server = new GRPCServer("localhost", 10000);
+        server.initialize();
+
+        server.addHandler(new RemoteServiceHandler(moduleManager));
+
+        server.start();
+
+        TimeUnit.MINUTES.sleep(10);
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
new file mode 100644
index 000000000..e14f29d74
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import io.grpc.testing.GrpcServerRule;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientTestCase {
+
+    private final int nextWorkerId = 1;
+    private ModuleManagerTesting moduleManager;
+    private StreamDataClassGetter classGetter;
+    @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
+
+    @Before
+    public void before() {
+        moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, moduleDefine);
+
+        classGetter = mock(StreamDataClassGetter.class);
+        moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
+
+        TestWorker worker = new TestWorker(nextWorkerId);
+        WorkerInstances.INSTANCES.put(nextWorkerId, worker);
+    }
+
+    @Test
+    public void testPush() throws InterruptedException {
+        grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
+
+        Address address = new Address("not-important", 11, false);
+        GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(classGetter, address, 1, 10));
+        remoteClient.connect();
+
+        doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
+
+        when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
+
+        Class<?> dataClass = TestStreamData.class;
+        when(classGetter.findClassById(1)).thenReturn((Class<StreamData>)dataClass);
+
+        for (int i = 0; i < 12; i++) {
+            remoteClient.push(nextWorkerId, new TestStreamData());
+        }
+
+        TimeUnit.SECONDS.sleep(1);
+    }
+
+    public static class TestStreamData extends StreamData {
+
+        private long value;
+
+        @Override public int remoteHashCode() {
+            return 0;
+        }
+
+        @Override public void deserialize(RemoteData remoteData) {
+            this.value = remoteData.getDataLongs(0);
+        }
+
+        @Override public RemoteData.Builder serialize() {
+            RemoteData.Builder builder = RemoteData.newBuilder();
+            builder.addDataLongs(987);
+            return builder;
+        }
+    }
+
+    class TestWorker extends AbstractWorker {
+
+        public TestWorker(int workerId) {
+            super(workerId);
+        }
+
+        @Override public void in(Object o) {
+            TestStreamData streamData = (TestStreamData)o;
+            Assert.assertEquals(987, streamData.value);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
new file mode 100644
index 000000000..54b6cb758
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteClientManagerTestCase {
+
+    @Test
+    public void refresh() {
+        ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting clusterModuleDefine = new ModuleDefineTesting();
+        moduleManager.put(ClusterModule.NAME, clusterModuleDefine);
+
+        ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, coreModuleDefine);
+
+        ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
+        clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery);
+
+        StreamDataClassGetter streamDataClassGetter = mock(StreamDataClassGetter.class);
+        coreModuleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, streamDataClassGetter);
+
+        RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
+
+        when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
+        clientManager.refresh();
+
+        List<RemoteClient> remoteClients = clientManager.getRemoteClient();
+        Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
+        Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
+        Assert.assertEquals("host3", remoteClients.get(2).getAddress().getHost());
+
+        Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
+        Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
+        Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
+
+        when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupTwoInstances());
+        clientManager.refresh();
+
+        remoteClients = clientManager.getRemoteClient();
+        Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
+        Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
+        Assert.assertEquals("host4", remoteClients.get(2).getAddress().getHost());
+        Assert.assertEquals("host5", remoteClients.get(3).getAddress().getHost());
+
+        Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
+        Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
+        Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
+        Assert.assertTrue(remoteClients.get(3) instanceof GRPCRemoteClient);
+    }
+
+    private List<RemoteInstance> groupOneInstances() {
+        List<RemoteInstance> instances = new ArrayList<>();
+        instances.add(new RemoteInstance(new Address("host3", 100, false)));
+        instances.add(new RemoteInstance(new Address("host1", 100, false)));
+        instances.add(new RemoteInstance(new Address("host2", 100, true)));
+        return instances;
+    }
+
+    private List<RemoteInstance> groupTwoInstances() {
+        List<RemoteInstance> instances = new ArrayList<>();
+        instances.add(new RemoteInstance(new Address("host5", 100, false)));
+        instances.add(new RemoteInstance(new Address("host1", 100, false)));
+        instances.add(new RemoteInstance(new Address("host2", 100, true)));
+        instances.add(new RemoteInstance(new Address("host4", 100, false)));
+        return instances;
+    }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
index 5e43021e2..95af9af14 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
@@ -22,7 +22,8 @@
  * @author peng-yongsheng
  */
 public interface Client {
-    void initialize() throws ClientException;
+    
+    void connect() throws ClientException;
 
     void shutdown();
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 45df02479..f7c4b5cb0 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -59,7 +59,7 @@ public ElasticSearchClient(String clusterNodes, NameSpace namespace) {
         this.namespace = namespace;
     }
 
-    @Override public void initialize() {
+    @Override public void connect() {
         List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
 
         client = new RestHighLevelClient(
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 188fae35c..485c4475b 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -21,12 +21,15 @@
 import io.grpc.*;
 import lombok.Getter;
 import org.apache.skywalking.oap.server.library.client.Client;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public class GRPCClient implements Client {
 
+    private static final Logger logger = LoggerFactory.getLogger(GRPCClient.class);
+
     @Getter private final String host;
 
     @Getter private final int port;
@@ -38,12 +41,16 @@ public GRPCClient(String host, int port) {
         this.port = port;
     }
 
-    @Override public void initialize() {
+    @Override public void connect() {
         channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
     }
 
     @Override public void shutdown() {
-        channel.shutdownNow();
+        try {
+            channel.shutdownNow();
+        } catch (Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
     }
 
     public ManagedChannel getChannel() {
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
index b09bbbfb2..65a6b18f6 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
@@ -18,19 +18,12 @@
 
 package org.apache.skywalking.oap.server.library.client.jdbc.hikaricp;
 
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import com.zaxxer.hikari.*;
+import java.sql.*;
 import java.util.Properties;
 import org.apache.skywalking.oap.server.library.client.Client;
-import org.apache.skywalking.oap.server.library.client.ClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * JDBC Client uses HikariCP connection management lib to execute SQL.
@@ -47,7 +40,7 @@ public JDBCHikariCPClient(Properties properties) {
         hikariConfig = new HikariConfig(properties);
     }
 
-    @Override public void initialize() throws ClientException {
+    @Override public void connect() {
         dataSource = new HikariDataSource(hikariConfig);
     }
 
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
index f696202a3..28b86dfb1 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
@@ -48,7 +48,7 @@ public static void main(String[] args) throws IOException, ClientException {
         builder.endObject();
 
         ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null);
-        client.initialize();
+        client.connect();
 
         String indexName = "test";
         client.createIndex(indexName, settings, builder);
diff --git a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
index 04d8675ab..4c929eb61 100644
--- a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
+++ b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
@@ -43,10 +43,10 @@ enum ValueType {
     // A point in time.
     TIMESTAMP = 5;
 
-    // An IP address.
+    // An IP getAddress.
     IP_ADDRESS = 6;
 
-    // An email address.
+    // An email getAddress.
     EMAIL_ADDRESS = 7;
 
     // A URI.
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
index 88dc51073..d1a463213 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
@@ -59,7 +59,7 @@ public NetworkAddressRegisterServletHandler(ModuleManager moduleManager) {
                 String networkAddress = networkAddresses.get(i).getAsString();
 
                 if (logger.isDebugEnabled()) {
-                    logger.debug("network address register, network address: {}", networkAddress);
+                    logger.debug("network getAddress register, network getAddress: {}", networkAddress);
                 }
 
                 int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress);
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
index 8aaad1913..8fc00a0fa 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
@@ -92,7 +92,7 @@ private ReferenceIdExchanger(ModuleManager moduleManager) {
 
             if (networkAddressId == 0) {
                 if (logger.isDebugEnabled()) {
-                    logger.debug("network address: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
+                    logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
                 }
                 return false;
             } else {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
index b02254b7b..1fbdeb3ea 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
@@ -99,7 +99,7 @@ public void testTransform() throws Exception {
             }
 
             @Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
-                String key = "VitualAppCode:" + serviceId + ",address:" + addressId;
+                String key = "VitualAppCode:" + serviceId + ",getAddress:" + addressId;
                 if (applicationInstRegister.containsKey(key)) {
                     return applicationInstRegister.get(key);
                 } else {
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 16219bbee..47f0463d0 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -20,7 +20,7 @@ cluster:
    # library the oap-libs folder with your ZooKeeper 3.4.x library.
 #  zookeeper:
 #    hostPort: localhost:2181
-#    # Retry Policy
+#    #Retry Policy
 #    baseSleepTimeMs: 1000 # initial amount of time to wait between retries
 #    maxRetries: 3 # max number of times to retry
 #  kubernetes:
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml
index 0353842db..e9a520ba7 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -34,6 +34,7 @@
         <logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
         <logger name="org.apache.skywalking.oap.server.core.alarm.AlarmStandardPersistence" level="DEBUG"/>
         <logger name="org.apache.skywalking.oap.server.core" level="INFO"/>
+        <logger name="org.apache.skywalking.oap.server.core.remote.client" level="DEBUG"/>
         <logger name="org.apache.skywalking.oap.server.library.buffer" level="INFO"/>
         <Root level="DEBUG">
             <AppenderRef ref="Console"/>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 40d5416e2..1099d739d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -88,7 +88,7 @@ public void prepare() throws ServiceNotProvidedException {
     public void start() throws ModuleStartException {
         try {
             nameSpace.setNameSpace(config.getNameSpace());
-            elasticSearchClient.initialize();
+            elasticSearchClient.connect();
 
             StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
             installer.install(elasticSearchClient);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 8677afd45..ac8cb361d 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -19,43 +19,13 @@
 package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
 
 import java.util.Properties;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.library.client.ClientException;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
+import org.slf4j.*;
 
 /**
  * H2 Storage provider is for demonstration and preview only.
@@ -117,7 +87,7 @@ public H2StorageProvider() {
 
     @Override public void start() throws ServiceNotProvidedException, ModuleStartException {
         try {
-            h2Client.initialize();
+            h2Client.connect();
 
             H2TableInstaller installer = new H2TableInstaller(getManager());
             installer.install(h2Client);
@@ -125,8 +95,6 @@ public H2StorageProvider() {
             new H2RegisterLockInstaller().install(h2Client);
         } catch (StorageException e) {
             throw new ModuleStartException(e.getMessage(), e);
-        } catch (ClientException e) {
-            throw new ModuleStartException(e.getMessage(), e);
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services