You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2018/11/22 12:50:33 UTC

[incubator-skywalking] branch master updated: gRPC client usage improve (#1946)

This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c8a683  gRPC client usage improve (#1946)
7c8a683 is described below

commit 7c8a683c2f52353669ac1317e958d28400f8054c
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Thu Nov 22 20:50:27 2018 +0800

    gRPC client usage improve (#1946)
    
    * Close the clients which are unreachable.
    Remote client manager test case and comments.
    
    * Test the GRPCRemote client.
    
    * 1. Catch the throwable for onComplete method cause of it will throw exception when connection lost.
    2. Check the gRPC channel state, send message when state is ready, wait 5 second when state is not ready.
    
    Notice: gRPC channel getState with true parameter will trigger reconnect operation.
    
    * gRPC client will reconnect to the server when network recorvered.
    
    * Recovery application.yml
    
    * Recovery proto module commit id.
    
    * no message
    
    * Fixed compile error.
---
 .../core/remote/TraceSegmentServiceClient.java     |  32 ++---
 oap-server/pom.xml                                 |  21 ++-
 .../plugin/kubernetes/KubernetesCoordinator.java   |  27 ++--
 .../kubernetes/KubernetesCoordinatorTest.java      |  18 +--
 .../plugin/standalone/StandaloneManager.java       |   9 +-
 .../plugin/standalone/StandaloneManagerTest.java   |   5 +-
 .../plugin/zookeeper/ZookeeperCoordinator.java     |  15 ++-
 .../ClusterModuleZookeeperProviderTestCase.java    |   7 +-
 oap-server/server-core/pom.xml                     |  14 +-
 .../oap/server/core/CoreModuleProvider.java        |  53 ++------
 .../core/analysis/data/NonMergeDataCollection.java |   4 +-
 .../oap/server/core/cluster/RemoteInstance.java    |  36 ++----
 .../service/NetworkAddressInventoryRegister.java   |   2 +-
 .../service/ServiceInstanceInventoryRegister.java  |   2 +-
 .../server/core/remote/RemoteServiceHandler.java   |  23 +++-
 .../client/Address.java}                           |  45 +++----
 .../core/remote/client/GRPCRemoteClient.java       | 120 +++++++++++++----
 .../server/core/remote/client/RemoteClient.java    |   8 +-
 .../core/remote/client/RemoteClientManager.java    | 113 ++++++++++++----
 .../core/remote/client/SelfRemoteClient.java       |  24 ++--
 .../core/storage/ttl/DataTTLKeeperTimer.java       |  25 ++--
 .../core/remote/RemoteServiceHandlerTestCase.java  | 142 +++++++++++++++++++++
 .../remote/client/GRPCRemoteClientRealClient.java  |  90 +++++++++++++
 .../remote/client/GRPCRemoteClientRealServer.java  |  50 ++++++++
 .../remote/client/GRPCRemoteClientTestCase.java    | 109 ++++++++++++++++
 .../remote/client/RemoteClientManagerTestCase.java |  95 ++++++++++++++
 .../oap/server/library/client/Client.java          |   3 +-
 .../client/elasticsearch/ElasticSearchClient.java  |   2 +-
 .../oap/server/library/client/grpc/GRPCClient.java |  11 +-
 .../client/jdbc/hikaricp/JDBCHikariCPClient.java   |  15 +--
 .../elasticsearch/ElasticSearchClientTestCase.java |   2 +-
 .../src/main/proto/policy/v1beta1/value_type.proto |   4 +-
 .../rest/NetworkAddressRegisterServletHandler.java |   2 +-
 .../standardization/ReferenceIdExchanger.java      |   2 +-
 .../transform/SpringSleuthSegmentBuilderTest.java  |   2 +-
 .../src/main/resources/application.yml             |   2 +-
 .../server-starter/src/main/resources/log4j2.xml   |   1 +
 .../StorageModuleElasticsearchProvider.java        |   2 +-
 .../storage/plugin/jdbc/h2/H2StorageProvider.java  |  44 +------
 39 files changed, 869 insertions(+), 312 deletions(-)

diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 8524142..0422cca 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -19,17 +19,12 @@
 package org.apache.skywalking.apm.agent.core.remote;
 
 import io.grpc.Channel;
-import java.util.List;
-
 import io.grpc.stub.StreamObserver;
-import org.apache.skywalking.apm.agent.core.boot.BootService;
-import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
-import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
-import org.apache.skywalking.apm.agent.core.context.TracingContext;
-import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.boot.*;
+import org.apache.skywalking.apm.agent.core.context.*;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
-import org.apache.skywalking.apm.agent.core.logging.api.ILog;
-import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.logging.api.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -37,8 +32,7 @@ import org.apache.skywalking.apm.network.common.Commands;
 import org.apache.skywalking.apm.network.language.agent.*;
 import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
 
-import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
-import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.*;
 import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
 
 /**
@@ -111,18 +105,18 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
                 }
             });
 
-            for (TraceSegment segment : data) {
-                try {
+            try {
+                for (TraceSegment segment : data) {
                     UpstreamSegment upstreamSegment = segment.transform();
                     upstreamSegmentStreamObserver.onNext(upstreamSegment);
-                } catch (Throwable t) {
-                    logger.error(t, "Transform and send UpstreamSegment to collector fail.");
                 }
-            }
-            upstreamSegmentStreamObserver.onCompleted();
+                upstreamSegmentStreamObserver.onCompleted();
 
-            status.wait4Finish();
-            segmentUplinkedCounter += data.size();
+                status.wait4Finish();
+                segmentUplinkedCounter += data.size();
+            } catch (Throwable t) {
+                logger.error(t, "Transform and send UpstreamSegment to collector fail.");
+            }
         } else {
             segmentAbandonedCounter += data.size();
         }
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index d5ce3c7..0f08ed0 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>apm</artifactId>
         <groupId>org.apache.skywalking</groupId>
@@ -237,6 +238,24 @@
                 <artifactId>grpc-testing</artifactId>
                 <version>${grpc.version}</version>
                 <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.mockito</groupId>
+                        <artifactId>mockito-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>junit</groupId>
+                        <artifactId>junit</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.grpc</groupId>
+                        <artifactId>grpc-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.grpc</groupId>
+                        <artifactId>grpc-stub</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 5c3e72f..8fdc5f8 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -19,25 +19,14 @@
 package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import com.google.common.util.concurrent.*;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.slf4j.*;
 
 /**
  * Read collector pod info from api-server of kubernetes, then using all containerIp list to
@@ -63,7 +52,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
     }
 
     @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
-        this.port = remoteInstance.getPort();
+        this.port = remoteInstance.getAddress().getPort();
         submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
             .setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
     }
@@ -99,7 +88,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
             switch (event.getType()) {
                 case "ADDED":
                 case "MODIFIED":
-                    cache.put(event.getUid(), new RemoteInstance(event.getHost(), port, event.getUid().equals(this.uid)));
+                    cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
                     break;
                 case "DELETED":
                     cache.remove(event.getUid());
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 35c57f8..4fd2699 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
 import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.junit.Test;
 
 import static org.hamcrest.core.Is.is;
@@ -29,44 +30,43 @@ public class KubernetesCoordinatorTest {
 
     private KubernetesCoordinator coordinator;
 
-
     @Test
     public void assertAdded() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
 
     @Test
     public void assertModified() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.3"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
     }
 
     @Test
     public void assertDeleted() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(1));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
 
     @Test
     public void assertError() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
         coordinator = new KubernetesCoordinator(watch, () -> "1");
-        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
-        assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
 }
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
index 6cba94f..ef30870 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
@@ -18,11 +18,8 @@
 
 package org.apache.skywalking.oap.server.cluster.plugin.standalone;
 
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
 
 /**
  * A cluster manager simulator. Work in memory only. Also return the current instance.
@@ -35,7 +32,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
 
     @Override public void registerRemote(RemoteInstance remoteInstance) {
         this.remoteInstance = remoteInstance;
-        this.remoteInstance.setSelf(true);
+        this.remoteInstance.getAddress().setSelf(true);
     }
 
     @Override
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
index 55671bc..77ede74 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
@@ -19,14 +19,15 @@
 package org.apache.skywalking.oap.server.cluster.plugin.standalone;
 
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.junit.*;
 
 public class StandaloneManagerTest {
     @Test
     public void test() {
         StandaloneManager standaloneManager = new StandaloneManager();
-        RemoteInstance remote1 = new RemoteInstance("A", 100, true);
-        RemoteInstance remote2 = new RemoteInstance("B", 100, false);
+        RemoteInstance remote1 = new RemoteInstance(new Address("A", 100, true));
+        RemoteInstance remote2 = new RemoteInstance(new Address("B", 100, false));
 
         standaloneManager.registerRemote(remote1);
         Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
index 07f53a6..826bdbb 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
 import java.util.*;
 import org.apache.curator.x.discovery.*;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.slf4j.*;
 
 /**
@@ -31,7 +32,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
 
     private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
     private volatile ServiceCache<RemoteInstance> serviceCache;
-    private volatile RemoteInstance selfInstance;
+    private volatile Address selfAddress;
 
     ZookeeperCoordinator(ServiceDiscovery<RemoteInstance> serviceDiscovery) {
         this.serviceDiscovery = serviceDiscovery;
@@ -44,8 +45,8 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
             ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
                 .name(remoteNamePath)
                 .id(UUID.randomUUID().toString())
-                .address(remoteInstance.getHost())
-                .port(remoteInstance.getPort())
+                .address(remoteInstance.getAddress().getHost())
+                .port(remoteInstance.getAddress().getPort())
                 .payload(remoteInstance)
                 .build();
 
@@ -57,7 +58,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
 
             serviceCache.start();
 
-            this.selfInstance = remoteInstance;
+            this.selfAddress = remoteInstance.getAddress();
         } catch (Exception e) {
             throw new ServiceRegisterException(e.getMessage());
         }
@@ -70,10 +71,10 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
 
             serviceInstances.forEach(serviceInstance -> {
                 RemoteInstance instance = serviceInstance.getPayload();
-                if (instance.equals(selfInstance)) {
-                    instance.setSelf(true);
+                if (instance.getAddress().equals(selfAddress)) {
+                    instance.getAddress().setSelf(true);
                 } else {
-                    instance.setSelf(false);
+                    instance.getAddress().setSelf(false);
                 }
                 remoteInstanceDetails.add(instance);
             });
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
index a318481..c8f6446 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import org.apache.curator.test.TestingServer;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.junit.*;
 
@@ -52,7 +53,7 @@ public class ClusterModuleZookeeperProviderTestCase {
         ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
         ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
 
-        RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
+        RemoteInstance remoteInstance = new RemoteInstance(new Address("ProviderAHost", 1000, true));
 
         moduleRegister.registerRemote(remoteInstance);
 
@@ -63,8 +64,8 @@ public class ClusterModuleZookeeperProviderTestCase {
                 continue;
             }
             Assert.assertEquals(1, detailsList.size());
-            Assert.assertEquals("ProviderAHost", detailsList.get(0).getHost());
-            Assert.assertEquals(1000, detailsList.get(0).getPort());
+            Assert.assertEquals("ProviderAHost", detailsList.get(0).getAddress().getHost());
+            Assert.assertEquals(1000, detailsList.get(0).getAddress().getPort());
         }
 
     }
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index 9669ca0..37a8f68 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>oap-server</artifactId>
         <groupId>org.apache.skywalking</groupId>
@@ -34,6 +35,11 @@
             <artifactId>snakeyaml</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.skywalking</groupId>
             <artifactId>library-module</artifactId>
             <version>${project.version}</version>
@@ -63,6 +69,12 @@
             <artifactId>apm-network</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>server-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index b1181da..f0e9084 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -22,55 +22,26 @@ import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
 import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
-import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
-import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
-import org.apache.skywalking.oap.server.core.query.MetricQueryService;
-import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
-import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.query.*;
 import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
-import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
-import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
-import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
 import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
 import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
 import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -186,7 +157,7 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
+        RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
         this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
 
         PersistenceTimer.INSTANCE.start(getManager());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
index 2a4ac3a..1d7a53c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
@@ -69,11 +69,11 @@ public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements
     }
 
     @Override public boolean containsKey(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("None merge data collection not support containsKey operation.");
+        throw new UnsupportedOperationException("Close merge data collection not support containsKey operation.");
     }
 
     @Override public STORAGE_DATA get(STORAGE_DATA key) {
-        throw new UnsupportedOperationException("None merge data collection not support get operation.");
+        throw new UnsupportedOperationException("Close merge data collection not support get operation.");
     }
 
     @Override public void put(STORAGE_DATA value) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 7b0d48f..44a4855 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -18,44 +18,26 @@
 
 package org.apache.skywalking.oap.server.core.cluster;
 
-import java.util.Objects;
-import lombok.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
 
 /**
  * @author peng-yongsheng
  */
+@Getter
 public class RemoteInstance implements Comparable<RemoteInstance> {
 
-    @Getter private final String host;
-    @Getter private final int port;
-    @Getter @Setter private boolean isSelf = false;
+    private final Address address;
 
-    public RemoteInstance(String host, int port, boolean isSelf) {
-        this.host = host;
-        this.port = port;
-        this.isSelf = isSelf;
-    }
-
-    @Override public int compareTo(RemoteInstance o) {
-        return toString().compareTo(o.toString());
+    public RemoteInstance(Address address) {
+        this.address = address;
     }
 
     @Override public String toString() {
-        return host + ":" + String.valueOf(port);
+        return address.toString();
     }
 
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        RemoteInstance instance = (RemoteInstance)o;
-        return port == instance.port &&
-            Objects.equals(host, instance.host);
-    }
-
-    @Override public int hashCode() {
-
-        return Objects.hash(host, port);
+    @Override public int compareTo(RemoteInstance o) {
+        return this.address.compareTo(o.getAddress());
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
index cccb7bc..cf83344 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
@@ -103,7 +103,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
 
             InventoryProcess.INSTANCE.in(networkAddress);
         } else {
-            logger.warn("Network address {} heartbeat, but not found in storage.");
+            logger.warn("Network getAddress {} heartbeat, but not found in storage.");
         }
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
index cf57cb7..9a0dc76 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
@@ -81,7 +81,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
 
     @Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
         if (logger.isDebugEnabled()) {
-            logger.debug("get or create service instance by address id, service id: {}, address id: {}, registerTime: {}", serviceId, addressId, registerTime);
+            logger.debug("get or create service instance by getAddress id, service id: {}, getAddress id: {}, registerTime: {}", serviceId, addressId, registerTime);
         }
 
         int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, addressId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 92b2a87..de71610 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -25,27 +25,36 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
 import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.slf4j.*;
 
 /**
+ * This class is Server-side streaming RPC implementation. It's a common service for OAP servers
+ * to receive message from each others.
+ * The stream data id is used to find the object to deserialize message.
+ * The next worker id is used to find the worker to process message.
+ *
  * @author peng-yongsheng
  */
 public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
 
     private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
 
-    private final ModuleManager moduleManager;
+    private final ModuleDefineHolder moduleDefineHolder;
     private StreamDataClassGetter streamDataClassGetter;
 
-    public RemoteServiceHandler(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
+        this.moduleDefineHolder = moduleDefineHolder;
     }
 
     @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
         if (Objects.isNull(streamDataClassGetter)) {
-            streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+            synchronized (RemoteServiceHandler.class) {
+                if (Objects.isNull(streamDataClassGetter)) {
+                    streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+                }
+            }
         }
 
         return new StreamObserver<RemoteMessage>() {
@@ -59,8 +68,8 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
                     StreamData streamData = streamDataClass.newInstance();
                     streamData.deserialize(remoteData);
                     WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
-                } catch (InstantiationException | IllegalAccessException e) {
-                    logger.warn(e.getMessage());
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
                 }
             }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
similarity index 57%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
index 7b0d48f..4e5d0ce 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
@@ -16,46 +16,47 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.cluster;
+package org.apache.skywalking.oap.server.core.remote.client;
 
-import java.util.Objects;
 import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
 
 /**
  * @author peng-yongsheng
  */
-public class RemoteInstance implements Comparable<RemoteInstance> {
+@Getter
+public class Address implements Comparable<Address> {
+    private final String host;
+    private final int port;
+    @Setter private boolean isSelf;
 
-    @Getter private final String host;
-    @Getter private final int port;
-    @Getter @Setter private boolean isSelf = false;
-
-    public RemoteInstance(String host, int port, boolean isSelf) {
+    public Address(String host, int port, boolean isSelf) {
         this.host = host;
         this.port = port;
         this.isSelf = isSelf;
     }
 
-    @Override public int compareTo(RemoteInstance o) {
-        return toString().compareTo(o.toString());
-    }
-
-    @Override public String toString() {
-        return host + ":" + String.valueOf(port);
+    @Override public int hashCode() {
+        return toString().hashCode();
     }
 
-    @Override public boolean equals(Object o) {
-        if (this == o)
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
             return true;
-        if (o == null || getClass() != o.getClass())
+        if (obj == null)
             return false;
-        RemoteInstance instance = (RemoteInstance)o;
-        return port == instance.port &&
-            Objects.equals(host, instance.host);
+        if (getClass() != obj.getClass())
+            return false;
+
+        Address address = (Address)obj;
+        return host.equals(address.host) && port == address.port;
     }
 
-    @Override public int hashCode() {
+    @Override public String toString() {
+        return host + Const.ID_SPLIT + port;
+    }
 
-        return Objects.hash(host, port);
+    @Override public int compareTo(Address o) {
+        return this.toString().compareTo(o.toString());
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index e367f4d..b061ca6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
+import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
@@ -32,27 +32,82 @@ import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
 import org.slf4j.*;
 
 /**
+ * This is a wrapper of the gRPC client for sending message to each other OAP server.
+ * It contains a block queue to buffering the message and sending the message by batch.
+ *
  * @author peng-yongsheng
  */
-public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
+public class GRPCRemoteClient implements RemoteClient {
 
     private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
 
-    private final GRPCClient client;
-    private final DataCarrier<RemoteMessage> carrier;
+    private final int channelSize;
+    private final int bufferSize;
+    private final Address address;
     private final StreamDataClassGetter streamDataClassGetter;
     private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
+    private GRPCClient client;
+    private DataCarrier<RemoteMessage> carrier;
+    private boolean isConnect;
 
-    public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
+    public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
         int bufferSize) {
         this.streamDataClassGetter = streamDataClassGetter;
-        this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
-        this.client.initialize();
-        this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
-        this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
-        this.carrier.consume(new RemoteMessageConsumer(), 1);
+        this.address = address;
+        this.channelSize = channelSize;
+        this.bufferSize = bufferSize;
+    }
+
+    @Override public void connect() {
+        if (!isConnect) {
+            this.getClient().connect();
+            this.getDataCarrier().consume(new RemoteMessageConsumer(), 1);
+            this.isConnect = true;
+        }
+    }
+
+    /**
+     * Get channel state by the true value of request connection.
+     *
+     * @return a channel when the state to be ready
+     */
+    ManagedChannel getChannel() {
+        return getClient().getChannel();
     }
 
+    GRPCClient getClient() {
+        if (Objects.isNull(client)) {
+            synchronized (GRPCRemoteClient.class) {
+                if (Objects.isNull(client)) {
+                    this.client = new GRPCClient(address.getHost(), address.getPort());
+                }
+            }
+        }
+        return client;
+    }
+
+    RemoteServiceGrpc.RemoteServiceStub getStub() {
+        return RemoteServiceGrpc.newStub(getChannel());
+    }
+
+    DataCarrier<RemoteMessage> getDataCarrier() {
+        if (Objects.isNull(this.carrier)) {
+            synchronized (GRPCRemoteClient.class) {
+                if (Objects.isNull(this.carrier)) {
+                    this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
+                    this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
+                }
+            }
+        }
+        return this.carrier;
+    }
+
+    /**
+     * Push stream data which need to send to another OAP server.
+     *
+     * @param nextWorkerId the id of a worker which will process this stream data.
+     * @param streamData the entity contains the values.
+     */
     @Override public void push(int nextWorkerId, StreamData streamData) {
         int streamDataId = streamDataClassGetter.findIdByClass(streamData.getClass());
         RemoteMessage.Builder builder = RemoteMessage.newBuilder();
@@ -60,7 +115,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
         builder.setStreamDataId(streamDataId);
         builder.setRemoteData(streamData.serialize());
 
-        this.carrier.produce(builder.build());
+        this.getDataCarrier().produce(builder.build());
     }
 
     class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@@ -68,12 +123,15 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
         }
 
         @Override public void consume(List<RemoteMessage> remoteMessages) {
-            StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
-
-            for (RemoteMessage remoteMessage : remoteMessages) {
-                streamObserver.onNext(remoteMessage);
+            try {
+                StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+                for (RemoteMessage remoteMessage : remoteMessages) {
+                    streamObserver.onNext(remoteMessage);
+                }
+                streamObserver.onCompleted();
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
             }
-            streamObserver.onCompleted();
         }
 
         @Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
@@ -84,9 +142,14 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
         }
     }
 
+    /**
+     * Create a gRPC stream observer to sending stream data, one stream observer
+     * could send multiple stream data by a single consume.
+     * The max number of concurrency allowed at the same time is 10.
+     *
+     * @return stream observer
+     */
     private StreamObserver<RemoteMessage> createStreamObserver() {
-        RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
-
         int sleepTotalMillis = 0;
         int sleepMillis = 10;
         while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
@@ -105,7 +168,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
             }
         }
 
-        return stub.call(new StreamObserver<Empty>() {
+        return getStub().call(new StreamObserver<Empty>() {
             @Override public void onNext(Empty empty) {
             }
 
@@ -120,15 +183,20 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
         });
     }
 
-    @Override public int compareTo(GRPCRemoteClient o) {
-        return this.client.toString().compareTo(o.client.toString());
+    @Override public void close() {
+        if (Objects.nonNull(this.carrier)) {
+            this.carrier.shutdownConsumers();
+        }
+        if (Objects.nonNull(this.client)) {
+            this.client.shutdown();
+        }
     }
 
-    public String getHost() {
-        return client.getHost();
+    @Override public Address getAddress() {
+        return address;
     }
 
-    public int getPort() {
-        return client.getPort();
+    @Override public int compareTo(RemoteClient o) {
+        return address.compareTo(o.getAddress());
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
index 8172a28..ccc216f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -23,11 +23,13 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 /**
  * @author peng-yongsheng
  */
-public interface RemoteClient {
+public interface RemoteClient extends Comparable<RemoteClient> {
 
-    String getHost();
+    Address getAddress();
 
-    int getPort();
+    void connect();
+
+    void close();
 
     void push(int nextWorkerId, StreamData streamData);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 129b517..7d78be0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -27,37 +27,61 @@ import org.apache.skywalking.oap.server.library.module.*;
 import org.slf4j.*;
 
 /**
+ * This class manages the connections between OAP servers. There is a task schedule that will
+ * automatically query a server list from the cluster module. Such as Zookeeper cluster module
+ * or Kubernetes cluster module.
+ *
  * @author peng-yongsheng
  */
 public class RemoteClientManager implements Service {
 
     private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
 
-    private final ModuleManager moduleManager;
+    private final ModuleDefineHolder moduleDefineHolder;
     private StreamDataClassGetter streamDataClassGetter;
     private ClusterNodesQuery clusterNodesQuery;
     private final List<RemoteClient> clientsA;
     private final List<RemoteClient> clientsB;
     private volatile List<RemoteClient> usingClients;
 
-    public RemoteClientManager(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
+        this.moduleDefineHolder = moduleDefineHolder;
         this.clientsA = new LinkedList<>();
         this.clientsB = new LinkedList<>();
         this.usingClients = clientsA;
     }
 
     public void start() {
-        this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
-        this.streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
-        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 5, 5, TimeUnit.SECONDS);
+        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 5, TimeUnit.SECONDS);
     }
 
-    private void refresh() {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Refresh remote nodes collection.");
-        }
+    /**
+     * Query OAP server list from the cluster module and create a new connection
+     * for the new node. Make the OAP server orderly because of each of the server
+     * will send stream data to each other by hash code.
+     */
+    void refresh() {
         try {
+            if (Objects.isNull(clusterNodesQuery)) {
+                synchronized (RemoteClientManager.class) {
+                    if (Objects.isNull(clusterNodesQuery)) {
+                        this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
+                    }
+                }
+            }
+
+            if (Objects.isNull(streamDataClassGetter)) {
+                synchronized (RemoteClientManager.class) {
+                    if (Objects.isNull(streamDataClassGetter)) {
+                        this.streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+                    }
+                }
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Refresh remote nodes collection.");
+            }
+
             List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
             Collections.sort(instanceList);
 
@@ -66,12 +90,11 @@ public class RemoteClientManager implements Service {
             }
 
             if (!compare(instanceList)) {
-                buildNewClients(instanceList);
+                reBuildRemoteClients(instanceList);
             }
         } catch (Throwable t) {
             logger.error(t.getMessage(), t);
         }
-
     }
 
     public List<RemoteClient> getRemoteClient() {
@@ -94,34 +117,68 @@ public class RemoteClientManager implements Service {
         }
     }
 
-    private void buildNewClients(List<RemoteInstance> remoteInstances) {
+    /**
+     * Compare clients between exist clients and remote instance collection. Move
+     * the clients into new client collection which are alive to avoid create a
+     * new channel. Shutdown the clients which could not find in cluster config.
+     *
+     * Create a gRPC client for remote instance except for self-instance.
+     *
+     * @param remoteInstances Remote instance collection by query cluster config.
+     */
+    private synchronized void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
         getFreeClients().clear();
 
-        Map<String, RemoteClient> currentClientsMap = new HashMap<>();
-        this.usingClients.forEach(remoteClient -> currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient));
+        Map<Address, RemoteClient> remoteClients = new HashMap<>();
+        getRemoteClient().forEach(client -> remoteClients.put(client.getAddress(), client));
+
+        Map<Address, Action> tempRemoteClients = new HashMap<>();
+        getRemoteClient().forEach(client -> tempRemoteClients.put(client.getAddress(), Action.Close));
 
         remoteInstances.forEach(remoteInstance -> {
-            String address = address(remoteInstance.getHost(), remoteInstance.getPort());
-            RemoteClient client;
-            if (currentClientsMap.containsKey(address)) {
-                client = currentClientsMap.get(address);
+            if (tempRemoteClients.containsKey(remoteInstance.getAddress())) {
+                tempRemoteClients.put(remoteInstance.getAddress(), Action.Leave);
             } else {
-                if (remoteInstance.isSelf()) {
-                    client = new SelfRemoteClient(remoteInstance.getHost(), remoteInstance.getPort());
-                } else {
-                    client = new GRPCRemoteClient(streamDataClassGetter, remoteInstance, 1, 3000);
-                }
+                tempRemoteClients.put(remoteInstance.getAddress(), Action.Create);
+            }
+        });
+
+        tempRemoteClients.forEach((address, action) -> {
+            switch (action) {
+                case Leave:
+                    if (remoteClients.containsKey(address)) {
+                        getFreeClients().add(remoteClients.get(address));
+                    }
+                    break;
+                case Create:
+                    if (address.isSelf()) {
+                        RemoteClient client = new SelfRemoteClient(address);
+                        getFreeClients().add(client);
+                    } else {
+                        RemoteClient client = new GRPCRemoteClient(streamDataClassGetter, address, 1, 3000);
+                        client.connect();
+                        getFreeClients().add(client);
+                    }
+                    break;
             }
-            getFreeClients().add(client);
         });
 
+        Collections.sort(getFreeClients());
         switchCurrentClients();
+
+        tempRemoteClients.forEach((address, action) -> {
+            if (Action.Close.equals(action) && remoteClients.containsKey(address)) {
+                remoteClients.get(address).close();
+            }
+        });
+
+        getFreeClients().clear();
     }
 
     private boolean compare(List<RemoteInstance> remoteInstances) {
         if (usingClients.size() == remoteInstances.size()) {
             for (int i = 0; i < usingClients.size(); i++) {
-                if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
+                if (!usingClients.get(i).getAddress().equals(remoteInstances.get(i).getAddress())) {
                     return false;
                 }
             }
@@ -131,7 +188,7 @@ public class RemoteClientManager implements Service {
         }
     }
 
-    private String address(String host, int port) {
-        return host + String.valueOf(port);
+    enum Action {
+        Close, Leave, Create
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 9b4f6bd..ab25f5c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
 
@@ -26,23 +27,28 @@ import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
  */
 public class SelfRemoteClient implements RemoteClient {
 
-    private final String host;
-    private final int port;
+    private final Address address;
 
-    public SelfRemoteClient(String host, int port) {
-        this.host = host;
-        this.port = port;
+    public SelfRemoteClient(Address address) {
+        this.address = address;
     }
 
-    @Override public String getHost() {
-        return host;
+    @Override public Address getAddress() {
+        return address;
     }
 
-    @Override public int getPort() {
-        return port;
+    @Override public void connect() {
+    }
+
+    @Override public void close() {
+        throw new UnexpectedException("Self remote client invoked to close.");
     }
 
     @Override public void push(int nextWorkerId, StreamData streamData) {
         WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
     }
+
+    @Override public int compareTo(RemoteClient o) {
+        return address.compareTo(o.getAddress());
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 861dd7b..ba3de7a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -20,29 +20,20 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import lombok.Setter;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.cluster.*;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.Downsampling;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -67,8 +58,8 @@ public enum DataTTLKeeperTimer {
 
     private void delete() {
         List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
-        if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).isSelf()) {
-            logger.info("The selected first address is {}. Skip.", remoteInstances.get(0).toString());
+        if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
+            logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
             return;
         }
 
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
new file mode 100644
index 0000000..b2e1f03
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import io.grpc.inprocess.*;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteServiceHandlerTestCase {
+
+    @Rule
+    public final GrpcCleanupRule gRPCCleanup = new GrpcCleanupRule();
+
+    @Test
+    public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException {
+        final int streamDataClassId = 1;
+        final int testWorkerId = 1;
+
+        ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, moduleDefine);
+
+        StreamDataClassGetter classGetter = mock(StreamDataClassGetter.class);
+        Class<?> dataClass = TestRemoteData.class;
+        when(classGetter.findClassById(streamDataClassId)).thenReturn((Class<StreamData>)dataClass);
+
+        moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
+
+        WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker());
+
+        String serverName = InProcessServerBuilder.generateName();
+
+        gRPCCleanup.register(InProcessServerBuilder
+            .forName(serverName).directExecutor().addService(new RemoteServiceHandler(moduleManager)).build().start());
+
+        RemoteServiceGrpc.RemoteServiceStub remoteServiceStub = RemoteServiceGrpc.newStub(
+            gRPCCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
+
+        StreamObserver<RemoteMessage> streamObserver = remoteServiceStub.call(new StreamObserver<Empty>() {
+            @Override public void onNext(Empty empty) {
+
+            }
+
+            @Override public void onError(Throwable throwable) {
+
+            }
+
+            @Override public void onCompleted() {
+
+            }
+        });
+
+        RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
+        remoteMessage.setStreamDataId(streamDataClassId);
+        remoteMessage.setNextWorkerId(testWorkerId);
+
+        RemoteData.Builder remoteData = RemoteData.newBuilder();
+        remoteData.addDataStrings("test1");
+        remoteData.addDataStrings("test2");
+
+        remoteData.addDataLongs(10);
+        remoteData.addDataLongs(20);
+        remoteMessage.setRemoteData(remoteData);
+
+        streamObserver.onNext(remoteMessage.build());
+        streamObserver.onCompleted();
+    }
+
+    static class TestRemoteData extends StreamData {
+
+        private String str1;
+        private String str2;
+        private long long1;
+        private long long2;
+
+        @Override public int remoteHashCode() {
+            return 10;
+        }
+
+        @Override public void deserialize(RemoteData remoteData) {
+            str1 = remoteData.getDataStrings(0);
+            str2 = remoteData.getDataStrings(1);
+            long1 = remoteData.getDataLongs(0);
+            long2 = remoteData.getDataLongs(1);
+
+            Assert.assertEquals("test1", str1);
+            Assert.assertEquals("test2", str2);
+            Assert.assertEquals(10, long1);
+            Assert.assertEquals(20, long2);
+        }
+
+        @Override public RemoteData.Builder serialize() {
+            return null;
+        }
+    }
+
+    static class TestWorker extends AbstractWorker {
+
+        public TestWorker() {
+            super(1);
+        }
+
+        @Override public void in(Object o) {
+            TestRemoteData data = (TestRemoteData)o;
+
+            Assert.assertEquals("test1", data.str1);
+            Assert.assertEquals("test2", data.str2);
+            Assert.assertEquals(10, data.long1);
+            Assert.assertEquals(20, data.long2);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
new file mode 100644
index 0000000..41953fb
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.junit.Assert;
+
+import static org.mockito.Mockito.spy;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientRealClient {
+
+    public static void main(String[] args) throws InterruptedException {
+        Address address = new Address("localhost", 10000, false);
+        GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(new TestClassGetter(), address, 1, 10));
+        remoteClient.connect();
+
+        for (int i = 0; i < 10000; i++) {
+            remoteClient.push(1, new TestStreamData());
+            TimeUnit.SECONDS.sleep(1);
+        }
+
+        TimeUnit.MINUTES.sleep(10);
+    }
+
+    public static class TestClassGetter implements StreamDataClassGetter {
+
+        @Override public int findIdByClass(Class streamDataClass) {
+            return 1;
+        }
+
+        @Override public Class<StreamData> findClassById(int id) {
+            Class<?> clazz = TestStreamData.class;
+            return (Class<StreamData>)clazz;
+        }
+    }
+
+    public static class TestStreamData extends StreamData {
+
+        private long value;
+
+        @Override public int remoteHashCode() {
+            return 0;
+        }
+
+        @Override public void deserialize(RemoteData remoteData) {
+            this.value = remoteData.getDataLongs(0);
+        }
+
+        @Override public RemoteData.Builder serialize() {
+            RemoteData.Builder builder = RemoteData.newBuilder();
+            builder.addDataLongs(987);
+            return builder;
+        }
+    }
+
+    static class TestWorker extends AbstractWorker {
+
+        public TestWorker(int workerId) {
+            super(workerId);
+        }
+
+        @Override public void in(Object o) {
+            TestStreamData streamData = (TestStreamData)o;
+            Assert.assertEquals(987, streamData.value);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
new file mode 100644
index 0000000..ae5809c
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.library.server.ServerException;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
+import org.apache.skywalking.oap.server.testing.module.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientRealServer {
+
+    public static void main(String[] args) throws ServerException, InterruptedException {
+        ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, moduleDefine);
+
+        moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, new GRPCRemoteClientRealClient.TestClassGetter());
+
+        GRPCServer server = new GRPCServer("localhost", 10000);
+        server.initialize();
+
+        server.addHandler(new RemoteServiceHandler(moduleManager));
+
+        server.start();
+
+        TimeUnit.MINUTES.sleep(10);
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
new file mode 100644
index 0000000..e14f29d
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import io.grpc.testing.GrpcServerRule;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientTestCase {
+
+    private final int nextWorkerId = 1;
+    private ModuleManagerTesting moduleManager;
+    private StreamDataClassGetter classGetter;
+    @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
+
+    @Before
+    public void before() {
+        moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, moduleDefine);
+
+        classGetter = mock(StreamDataClassGetter.class);
+        moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
+
+        TestWorker worker = new TestWorker(nextWorkerId);
+        WorkerInstances.INSTANCES.put(nextWorkerId, worker);
+    }
+
+    @Test
+    public void testPush() throws InterruptedException {
+        grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
+
+        Address address = new Address("not-important", 11, false);
+        GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(classGetter, address, 1, 10));
+        remoteClient.connect();
+
+        doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
+
+        when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
+
+        Class<?> dataClass = TestStreamData.class;
+        when(classGetter.findClassById(1)).thenReturn((Class<StreamData>)dataClass);
+
+        for (int i = 0; i < 12; i++) {
+            remoteClient.push(nextWorkerId, new TestStreamData());
+        }
+
+        TimeUnit.SECONDS.sleep(1);
+    }
+
+    public static class TestStreamData extends StreamData {
+
+        private long value;
+
+        @Override public int remoteHashCode() {
+            return 0;
+        }
+
+        @Override public void deserialize(RemoteData remoteData) {
+            this.value = remoteData.getDataLongs(0);
+        }
+
+        @Override public RemoteData.Builder serialize() {
+            RemoteData.Builder builder = RemoteData.newBuilder();
+            builder.addDataLongs(987);
+            return builder;
+        }
+    }
+
+    class TestWorker extends AbstractWorker {
+
+        public TestWorker(int workerId) {
+            super(workerId);
+        }
+
+        @Override public void in(Object o) {
+            TestStreamData streamData = (TestStreamData)o;
+            Assert.assertEquals(987, streamData.value);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
new file mode 100644
index 0000000..54b6cb7
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteClientManagerTestCase {
+
+    @Test
+    public void refresh() {
+        ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+        ModuleDefineTesting clusterModuleDefine = new ModuleDefineTesting();
+        moduleManager.put(ClusterModule.NAME, clusterModuleDefine);
+
+        ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
+        moduleManager.put(CoreModule.NAME, coreModuleDefine);
+
+        ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
+        clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery);
+
+        StreamDataClassGetter streamDataClassGetter = mock(StreamDataClassGetter.class);
+        coreModuleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, streamDataClassGetter);
+
+        RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
+
+        when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
+        clientManager.refresh();
+
+        List<RemoteClient> remoteClients = clientManager.getRemoteClient();
+        Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
+        Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
+        Assert.assertEquals("host3", remoteClients.get(2).getAddress().getHost());
+
+        Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
+        Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
+        Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
+
+        when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupTwoInstances());
+        clientManager.refresh();
+
+        remoteClients = clientManager.getRemoteClient();
+        Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
+        Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
+        Assert.assertEquals("host4", remoteClients.get(2).getAddress().getHost());
+        Assert.assertEquals("host5", remoteClients.get(3).getAddress().getHost());
+
+        Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
+        Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
+        Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
+        Assert.assertTrue(remoteClients.get(3) instanceof GRPCRemoteClient);
+    }
+
+    private List<RemoteInstance> groupOneInstances() {
+        List<RemoteInstance> instances = new ArrayList<>();
+        instances.add(new RemoteInstance(new Address("host3", 100, false)));
+        instances.add(new RemoteInstance(new Address("host1", 100, false)));
+        instances.add(new RemoteInstance(new Address("host2", 100, true)));
+        return instances;
+    }
+
+    private List<RemoteInstance> groupTwoInstances() {
+        List<RemoteInstance> instances = new ArrayList<>();
+        instances.add(new RemoteInstance(new Address("host5", 100, false)));
+        instances.add(new RemoteInstance(new Address("host1", 100, false)));
+        instances.add(new RemoteInstance(new Address("host2", 100, true)));
+        instances.add(new RemoteInstance(new Address("host4", 100, false)));
+        return instances;
+    }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
index 5e43021..95af9af 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
@@ -22,7 +22,8 @@ package org.apache.skywalking.oap.server.library.client;
  * @author peng-yongsheng
  */
 public interface Client {
-    void initialize() throws ClientException;
+    
+    void connect() throws ClientException;
 
     void shutdown();
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 45df024..f7c4b5c 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -59,7 +59,7 @@ public class ElasticSearchClient implements Client {
         this.namespace = namespace;
     }
 
-    @Override public void initialize() {
+    @Override public void connect() {
         List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
 
         client = new RestHighLevelClient(
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 188fae3..485c447 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -21,12 +21,15 @@ package org.apache.skywalking.oap.server.library.client.grpc;
 import io.grpc.*;
 import lombok.Getter;
 import org.apache.skywalking.oap.server.library.client.Client;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public class GRPCClient implements Client {
 
+    private static final Logger logger = LoggerFactory.getLogger(GRPCClient.class);
+
     @Getter private final String host;
 
     @Getter private final int port;
@@ -38,12 +41,16 @@ public class GRPCClient implements Client {
         this.port = port;
     }
 
-    @Override public void initialize() {
+    @Override public void connect() {
         channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
     }
 
     @Override public void shutdown() {
-        channel.shutdownNow();
+        try {
+            channel.shutdownNow();
+        } catch (Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
     }
 
     public ManagedChannel getChannel() {
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
index b09bbbf..65a6b18 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
@@ -18,19 +18,12 @@
 
 package org.apache.skywalking.oap.server.library.client.jdbc.hikaricp;
 
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import com.zaxxer.hikari.*;
+import java.sql.*;
 import java.util.Properties;
 import org.apache.skywalking.oap.server.library.client.Client;
-import org.apache.skywalking.oap.server.library.client.ClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * JDBC Client uses HikariCP connection management lib to execute SQL.
@@ -47,7 +40,7 @@ public class JDBCHikariCPClient implements Client {
         hikariConfig = new HikariConfig(properties);
     }
 
-    @Override public void initialize() throws ClientException {
+    @Override public void connect() {
         dataSource = new HikariDataSource(hikariConfig);
     }
 
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
index f696202..28b86df 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
@@ -48,7 +48,7 @@ public class ElasticSearchClientTestCase {
         builder.endObject();
 
         ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null);
-        client.initialize();
+        client.connect();
 
         String indexName = "test";
         client.createIndex(indexName, settings, builder);
diff --git a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
index 04d8675..4c929eb 100644
--- a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
+++ b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
@@ -43,10 +43,10 @@ enum ValueType {
     // A point in time.
     TIMESTAMP = 5;
 
-    // An IP address.
+    // An IP getAddress.
     IP_ADDRESS = 6;
 
-    // An email address.
+    // An email getAddress.
     EMAIL_ADDRESS = 7;
 
     // A URI.
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
index 88dc510..d1a4632 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
@@ -59,7 +59,7 @@ public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {
                 String networkAddress = networkAddresses.get(i).getAsString();
 
                 if (logger.isDebugEnabled()) {
-                    logger.debug("network address register, network address: {}", networkAddress);
+                    logger.debug("network getAddress register, network getAddress: {}", networkAddress);
                 }
 
                 int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress);
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
index 8aaad19..8fc00a0 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
@@ -92,7 +92,7 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
 
             if (networkAddressId == 0) {
                 if (logger.isDebugEnabled()) {
-                    logger.debug("network address: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
+                    logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
                 }
                 return false;
             } else {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
index b02254b..1fbdeb3 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
@@ -99,7 +99,7 @@ public class SpringSleuthSegmentBuilderTest implements SegmentListener {
             }
 
             @Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
-                String key = "VitualAppCode:" + serviceId + ",address:" + addressId;
+                String key = "VitualAppCode:" + serviceId + ",getAddress:" + addressId;
                 if (applicationInstRegister.containsKey(key)) {
                     return applicationInstRegister.get(key);
                 } else {
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 16219bb..47f0463 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -20,7 +20,7 @@ cluster:
    # library the oap-libs folder with your ZooKeeper 3.4.x library.
 #  zookeeper:
 #    hostPort: localhost:2181
-#    # Retry Policy
+#    #Retry Policy
 #    baseSleepTimeMs: 1000 # initial amount of time to wait between retries
 #    maxRetries: 3 # max number of times to retry
 #  kubernetes:
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml
index 0353842..e9a520b 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -34,6 +34,7 @@
         <logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
         <logger name="org.apache.skywalking.oap.server.core.alarm.AlarmStandardPersistence" level="DEBUG"/>
         <logger name="org.apache.skywalking.oap.server.core" level="INFO"/>
+        <logger name="org.apache.skywalking.oap.server.core.remote.client" level="DEBUG"/>
         <logger name="org.apache.skywalking.oap.server.library.buffer" level="INFO"/>
         <Root level="DEBUG">
             <AppenderRef ref="Console"/>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 40d5416..1099d73 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -88,7 +88,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
     public void start() throws ModuleStartException {
         try {
             nameSpace.setNameSpace(config.getNameSpace());
-            elasticSearchClient.initialize();
+            elasticSearchClient.connect();
 
             StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
             installer.install(elasticSearchClient);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 8677afd..ac8cb36 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -19,43 +19,13 @@
 package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
 
 import java.util.Properties;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.library.client.ClientException;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
+import org.slf4j.*;
 
 /**
  * H2 Storage provider is for demonstration and preview only.
@@ -117,7 +87,7 @@ public class H2StorageProvider extends ModuleProvider {
 
     @Override public void start() throws ServiceNotProvidedException, ModuleStartException {
         try {
-            h2Client.initialize();
+            h2Client.connect();
 
             H2TableInstaller installer = new H2TableInstaller(getManager());
             installer.install(h2Client);
@@ -125,8 +95,6 @@ public class H2StorageProvider extends ModuleProvider {
             new H2RegisterLockInstaller().install(h2Client);
         } catch (StorageException e) {
             throw new ModuleStartException(e.getMessage(), e);
-        } catch (ClientException e) {
-            throw new ModuleStartException(e.getMessage(), e);
         }
     }