You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2018/11/22 12:50:33 UTC
[incubator-skywalking] branch master updated: gRPC client usage
improve (#1946)
This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 7c8a683 gRPC client usage improve (#1946)
7c8a683 is described below
commit 7c8a683c2f52353669ac1317e958d28400f8054c
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Thu Nov 22 20:50:27 2018 +0800
gRPC client usage improve (#1946)
* Close the clients which are unreachable.
Remote client manager test case and comments.
* Test the GRPCRemote client.
* 1. Catch the throwable for onComplete method cause of it will throw exception when connection lost.
2. Check the gRPC channel state, send message when state is ready, wait 5 second when state is not ready.
Notice: gRPC channel getState with true parameter will trigger reconnect operation.
* gRPC client will reconnect to the server when network recorvered.
* Recovery application.yml
* Recovery proto module commit id.
* no message
* Fixed compile error.
---
.../core/remote/TraceSegmentServiceClient.java | 32 ++---
oap-server/pom.xml | 21 ++-
.../plugin/kubernetes/KubernetesCoordinator.java | 27 ++--
.../kubernetes/KubernetesCoordinatorTest.java | 18 +--
.../plugin/standalone/StandaloneManager.java | 9 +-
.../plugin/standalone/StandaloneManagerTest.java | 5 +-
.../plugin/zookeeper/ZookeeperCoordinator.java | 15 ++-
.../ClusterModuleZookeeperProviderTestCase.java | 7 +-
oap-server/server-core/pom.xml | 14 +-
.../oap/server/core/CoreModuleProvider.java | 53 ++------
.../core/analysis/data/NonMergeDataCollection.java | 4 +-
.../oap/server/core/cluster/RemoteInstance.java | 36 ++----
.../service/NetworkAddressInventoryRegister.java | 2 +-
.../service/ServiceInstanceInventoryRegister.java | 2 +-
.../server/core/remote/RemoteServiceHandler.java | 23 +++-
.../client/Address.java} | 45 +++----
.../core/remote/client/GRPCRemoteClient.java | 120 +++++++++++++----
.../server/core/remote/client/RemoteClient.java | 8 +-
.../core/remote/client/RemoteClientManager.java | 113 ++++++++++++----
.../core/remote/client/SelfRemoteClient.java | 24 ++--
.../core/storage/ttl/DataTTLKeeperTimer.java | 25 ++--
.../core/remote/RemoteServiceHandlerTestCase.java | 142 +++++++++++++++++++++
.../remote/client/GRPCRemoteClientRealClient.java | 90 +++++++++++++
.../remote/client/GRPCRemoteClientRealServer.java | 50 ++++++++
.../remote/client/GRPCRemoteClientTestCase.java | 109 ++++++++++++++++
.../remote/client/RemoteClientManagerTestCase.java | 95 ++++++++++++++
.../oap/server/library/client/Client.java | 3 +-
.../client/elasticsearch/ElasticSearchClient.java | 2 +-
.../oap/server/library/client/grpc/GRPCClient.java | 11 +-
.../client/jdbc/hikaricp/JDBCHikariCPClient.java | 15 +--
.../elasticsearch/ElasticSearchClientTestCase.java | 2 +-
.../src/main/proto/policy/v1beta1/value_type.proto | 4 +-
.../rest/NetworkAddressRegisterServletHandler.java | 2 +-
.../standardization/ReferenceIdExchanger.java | 2 +-
.../transform/SpringSleuthSegmentBuilderTest.java | 2 +-
.../src/main/resources/application.yml | 2 +-
.../server-starter/src/main/resources/log4j2.xml | 1 +
.../StorageModuleElasticsearchProvider.java | 2 +-
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 44 +------
39 files changed, 869 insertions(+), 312 deletions(-)
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 8524142..0422cca 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -19,17 +19,12 @@
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
-import java.util.List;
-
import io.grpc.stub.StreamObserver;
-import org.apache.skywalking.apm.agent.core.boot.BootService;
-import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
-import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
-import org.apache.skywalking.apm.agent.core.context.TracingContext;
-import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.boot.*;
+import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
-import org.apache.skywalking.apm.agent.core.logging.api.ILog;
-import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.logging.api.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -37,8 +32,7 @@ import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
-import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
-import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.*;
import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
/**
@@ -111,18 +105,18 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
});
- for (TraceSegment segment : data) {
- try {
+ try {
+ for (TraceSegment segment : data) {
UpstreamSegment upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
- } catch (Throwable t) {
- logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
- }
- upstreamSegmentStreamObserver.onCompleted();
+ upstreamSegmentStreamObserver.onCompleted();
- status.wait4Finish();
- segmentUplinkedCounter += data.size();
+ status.wait4Finish();
+ segmentUplinkedCounter += data.size();
+ } catch (Throwable t) {
+ logger.error(t, "Transform and send UpstreamSegment to collector fail.");
+ }
} else {
segmentAbandonedCounter += data.size();
}
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index d5ce3c7..0f08ed0 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm</artifactId>
<groupId>org.apache.skywalking</groupId>
@@ -237,6 +238,24 @@
<artifactId>grpc-testing</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 5c3e72f..8fdc5f8 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -19,25 +19,14 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import com.google.common.util.concurrent.*;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.function.Supplier;
import javax.annotation.Nullable;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.slf4j.*;
/**
* Read collector pod info from api-server of kubernetes, then using all containerIp list to
@@ -63,7 +52,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
}
@Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
- this.port = remoteInstance.getPort();
+ this.port = remoteInstance.getAddress().getPort();
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
}
@@ -99,7 +88,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
switch (event.getType()) {
case "ADDED":
case "MODIFIED":
- cache.put(event.getUid(), new RemoteInstance(event.getHost(), port, event.getUid().equals(this.uid)));
+ cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
break;
case "DELETED":
cache.remove(event.getUid());
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 35c57f8..4fd2699 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
@@ -29,44 +30,43 @@ public class KubernetesCoordinatorTest {
private KubernetesCoordinator coordinator;
-
@Test
public void assertAdded() throws InterruptedException {
PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
- coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
- assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
@Test
public void assertModified() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
coordinator = new KubernetesCoordinator(watch, () -> "1");
- coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
- assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.3"));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
}
@Test
public void assertDeleted() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
- coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(1));
- assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
@Test
public void assertError() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
- coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
- assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
}
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
index 6cba94f..ef30870 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
@@ -18,11 +18,8 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
/**
* A cluster manager simulator. Work in memory only. Also return the current instance.
@@ -35,7 +32,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
@Override public void registerRemote(RemoteInstance remoteInstance) {
this.remoteInstance = remoteInstance;
- this.remoteInstance.setSelf(true);
+ this.remoteInstance.getAddress().setSelf(true);
}
@Override
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
index 55671bc..77ede74 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
@@ -19,14 +19,15 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.*;
public class StandaloneManagerTest {
@Test
public void test() {
StandaloneManager standaloneManager = new StandaloneManager();
- RemoteInstance remote1 = new RemoteInstance("A", 100, true);
- RemoteInstance remote2 = new RemoteInstance("B", 100, false);
+ RemoteInstance remote1 = new RemoteInstance(new Address("A", 100, true));
+ RemoteInstance remote2 = new RemoteInstance(new Address("B", 100, false));
standaloneManager.registerRemote(remote1);
Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
index 07f53a6..826bdbb 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.*;
import org.apache.curator.x.discovery.*;
import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.slf4j.*;
/**
@@ -31,7 +32,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
private volatile ServiceCache<RemoteInstance> serviceCache;
- private volatile RemoteInstance selfInstance;
+ private volatile Address selfAddress;
ZookeeperCoordinator(ServiceDiscovery<RemoteInstance> serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
@@ -44,8 +45,8 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
.name(remoteNamePath)
.id(UUID.randomUUID().toString())
- .address(remoteInstance.getHost())
- .port(remoteInstance.getPort())
+ .address(remoteInstance.getAddress().getHost())
+ .port(remoteInstance.getAddress().getPort())
.payload(remoteInstance)
.build();
@@ -57,7 +58,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
serviceCache.start();
- this.selfInstance = remoteInstance;
+ this.selfAddress = remoteInstance.getAddress();
} catch (Exception e) {
throw new ServiceRegisterException(e.getMessage());
}
@@ -70,10 +71,10 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
serviceInstances.forEach(serviceInstance -> {
RemoteInstance instance = serviceInstance.getPayload();
- if (instance.equals(selfInstance)) {
- instance.setSelf(true);
+ if (instance.getAddress().equals(selfAddress)) {
+ instance.getAddress().setSelf(true);
} else {
- instance.setSelf(false);
+ instance.getAddress().setSelf(false);
}
remoteInstanceDetails.add(instance);
});
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
index a318481..c8f6446 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.curator.test.TestingServer;
import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.*;
import org.junit.*;
@@ -52,7 +53,7 @@ public class ClusterModuleZookeeperProviderTestCase {
ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
- RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
+ RemoteInstance remoteInstance = new RemoteInstance(new Address("ProviderAHost", 1000, true));
moduleRegister.registerRemote(remoteInstance);
@@ -63,8 +64,8 @@ public class ClusterModuleZookeeperProviderTestCase {
continue;
}
Assert.assertEquals(1, detailsList.size());
- Assert.assertEquals("ProviderAHost", detailsList.get(0).getHost());
- Assert.assertEquals(1000, detailsList.get(0).getPort());
+ Assert.assertEquals("ProviderAHost", detailsList.get(0).getAddress().getHost());
+ Assert.assertEquals(1000, detailsList.get(0).getAddress().getPort());
}
}
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index 9669ca0..37a8f68 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
@@ -34,6 +35,11 @@
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-module</artifactId>
<version>${project.version}</version>
@@ -63,6 +69,12 @@
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>server-testing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index b1181da..f0e9084 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -22,55 +22,26 @@ import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
-import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
-import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
-import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
-import org.apache.skywalking.oap.server.core.query.MetricQueryService;
-import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
-import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
-import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
-import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
-import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
-import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -186,7 +157,7 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
- RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
+ RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
PersistenceTimer.INSTANCE.start(getManager());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
index 2a4ac3a..1d7a53c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
@@ -69,11 +69,11 @@ public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements
}
@Override public boolean containsKey(STORAGE_DATA key) {
- throw new UnsupportedOperationException("None merge data collection not support containsKey operation.");
+ throw new UnsupportedOperationException("Close merge data collection not support containsKey operation.");
}
@Override public STORAGE_DATA get(STORAGE_DATA key) {
- throw new UnsupportedOperationException("None merge data collection not support get operation.");
+ throw new UnsupportedOperationException("Close merge data collection not support get operation.");
}
@Override public void put(STORAGE_DATA value) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 7b0d48f..44a4855 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -18,44 +18,26 @@
package org.apache.skywalking.oap.server.core.cluster;
-import java.util.Objects;
-import lombok.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
/**
* @author peng-yongsheng
*/
+@Getter
public class RemoteInstance implements Comparable<RemoteInstance> {
- @Getter private final String host;
- @Getter private final int port;
- @Getter @Setter private boolean isSelf = false;
+ private final Address address;
- public RemoteInstance(String host, int port, boolean isSelf) {
- this.host = host;
- this.port = port;
- this.isSelf = isSelf;
- }
-
- @Override public int compareTo(RemoteInstance o) {
- return toString().compareTo(o.toString());
+ public RemoteInstance(Address address) {
+ this.address = address;
}
@Override public String toString() {
- return host + ":" + String.valueOf(port);
+ return address.toString();
}
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- RemoteInstance instance = (RemoteInstance)o;
- return port == instance.port &&
- Objects.equals(host, instance.host);
- }
-
- @Override public int hashCode() {
-
- return Objects.hash(host, port);
+ @Override public int compareTo(RemoteInstance o) {
+ return this.address.compareTo(o.getAddress());
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
index cccb7bc..cf83344 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
@@ -103,7 +103,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
InventoryProcess.INSTANCE.in(networkAddress);
} else {
- logger.warn("Network address {} heartbeat, but not found in storage.");
+ logger.warn("Network getAddress {} heartbeat, but not found in storage.");
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
index cf57cb7..9a0dc76 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
@@ -81,7 +81,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
@Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
if (logger.isDebugEnabled()) {
- logger.debug("get or create service instance by address id, service id: {}, address id: {}, registerTime: {}", serviceId, addressId, registerTime);
+ logger.debug("get or create service instance by getAddress id, service id: {}, getAddress id: {}, registerTime: {}", serviceId, addressId, registerTime);
}
int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, addressId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 92b2a87..de71610 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -25,27 +25,36 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
/**
+ * This class is Server-side streaming RPC implementation. It's a common service for OAP servers
+ * to receive message from each others.
+ * The stream data id is used to find the object to deserialize message.
+ * The next worker id is used to find the worker to process message.
+ *
* @author peng-yongsheng
*/
public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
- private final ModuleManager moduleManager;
+ private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
- public RemoteServiceHandler(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
+ public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
+ this.moduleDefineHolder = moduleDefineHolder;
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(streamDataClassGetter)) {
- streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+ synchronized (RemoteServiceHandler.class) {
+ if (Objects.isNull(streamDataClassGetter)) {
+ streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+ }
+ }
}
return new StreamObserver<RemoteMessage>() {
@@ -59,8 +68,8 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData);
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
- } catch (InstantiationException | IllegalAccessException e) {
- logger.warn(e.getMessage());
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
similarity index 57%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
index 7b0d48f..4e5d0ce 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
@@ -16,46 +16,47 @@
*
*/
-package org.apache.skywalking.oap.server.core.cluster;
+package org.apache.skywalking.oap.server.core.remote.client;
-import java.util.Objects;
import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
/**
* @author peng-yongsheng
*/
-public class RemoteInstance implements Comparable<RemoteInstance> {
+@Getter
+public class Address implements Comparable<Address> {
+ private final String host;
+ private final int port;
+ @Setter private boolean isSelf;
- @Getter private final String host;
- @Getter private final int port;
- @Getter @Setter private boolean isSelf = false;
-
- public RemoteInstance(String host, int port, boolean isSelf) {
+ public Address(String host, int port, boolean isSelf) {
this.host = host;
this.port = port;
this.isSelf = isSelf;
}
- @Override public int compareTo(RemoteInstance o) {
- return toString().compareTo(o.toString());
- }
-
- @Override public String toString() {
- return host + ":" + String.valueOf(port);
+ @Override public int hashCode() {
+ return toString().hashCode();
}
- @Override public boolean equals(Object o) {
- if (this == o)
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
return true;
- if (o == null || getClass() != o.getClass())
+ if (obj == null)
return false;
- RemoteInstance instance = (RemoteInstance)o;
- return port == instance.port &&
- Objects.equals(host, instance.host);
+ if (getClass() != obj.getClass())
+ return false;
+
+ Address address = (Address)obj;
+ return host.equals(address.host) && port == address.port;
}
- @Override public int hashCode() {
+ @Override public String toString() {
+ return host + Const.ID_SPLIT + port;
+ }
- return Objects.hash(host, port);
+ @Override public int compareTo(Address o) {
+ return this.toString().compareTo(o.toString());
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index e367f4d..b061ca6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.oap.server.core.remote.client;
+import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
@@ -32,27 +32,82 @@ import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.slf4j.*;
/**
+ * This is a wrapper of the gRPC client for sending message to each other OAP server.
+ * It contains a block queue to buffering the message and sending the message by batch.
+ *
* @author peng-yongsheng
*/
-public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
+public class GRPCRemoteClient implements RemoteClient {
private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
- private final GRPCClient client;
- private final DataCarrier<RemoteMessage> carrier;
+ private final int channelSize;
+ private final int bufferSize;
+ private final Address address;
private final StreamDataClassGetter streamDataClassGetter;
private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
+ private GRPCClient client;
+ private DataCarrier<RemoteMessage> carrier;
+ private boolean isConnect;
- public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
+ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
int bufferSize) {
this.streamDataClassGetter = streamDataClassGetter;
- this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
- this.client.initialize();
- this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
- this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
- this.carrier.consume(new RemoteMessageConsumer(), 1);
+ this.address = address;
+ this.channelSize = channelSize;
+ this.bufferSize = bufferSize;
+ }
+
+ @Override public void connect() {
+ if (!isConnect) {
+ this.getClient().connect();
+ this.getDataCarrier().consume(new RemoteMessageConsumer(), 1);
+ this.isConnect = true;
+ }
+ }
+
+ /**
+ * Get channel state by the true value of request connection.
+ *
+ * @return a channel when the state to be ready
+ */
+ ManagedChannel getChannel() {
+ return getClient().getChannel();
}
+ GRPCClient getClient() {
+ if (Objects.isNull(client)) {
+ synchronized (GRPCRemoteClient.class) {
+ if (Objects.isNull(client)) {
+ this.client = new GRPCClient(address.getHost(), address.getPort());
+ }
+ }
+ }
+ return client;
+ }
+
+ RemoteServiceGrpc.RemoteServiceStub getStub() {
+ return RemoteServiceGrpc.newStub(getChannel());
+ }
+
+ DataCarrier<RemoteMessage> getDataCarrier() {
+ if (Objects.isNull(this.carrier)) {
+ synchronized (GRPCRemoteClient.class) {
+ if (Objects.isNull(this.carrier)) {
+ this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
+ this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
+ }
+ }
+ }
+ return this.carrier;
+ }
+
+ /**
+ * Push stream data which need to send to another OAP server.
+ *
+ * @param nextWorkerId the id of a worker which will process this stream data.
+ * @param streamData the entity contains the values.
+ */
@Override public void push(int nextWorkerId, StreamData streamData) {
int streamDataId = streamDataClassGetter.findIdByClass(streamData.getClass());
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
@@ -60,7 +115,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
builder.setStreamDataId(streamDataId);
builder.setRemoteData(streamData.serialize());
- this.carrier.produce(builder.build());
+ this.getDataCarrier().produce(builder.build());
}
class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@@ -68,12 +123,15 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
}
@Override public void consume(List<RemoteMessage> remoteMessages) {
- StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
-
- for (RemoteMessage remoteMessage : remoteMessages) {
- streamObserver.onNext(remoteMessage);
+ try {
+ StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+ for (RemoteMessage remoteMessage : remoteMessages) {
+ streamObserver.onNext(remoteMessage);
+ }
+ streamObserver.onCompleted();
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
}
- streamObserver.onCompleted();
}
@Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
@@ -84,9 +142,14 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
}
}
+ /**
+ * Create a gRPC stream observer to sending stream data, one stream observer
+ * could send multiple stream data by a single consume.
+ * The max number of concurrency allowed at the same time is 10.
+ *
+ * @return stream observer
+ */
private StreamObserver<RemoteMessage> createStreamObserver() {
- RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
-
int sleepTotalMillis = 0;
int sleepMillis = 10;
while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
@@ -105,7 +168,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
}
}
- return stub.call(new StreamObserver<Empty>() {
+ return getStub().call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@@ -120,15 +183,20 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
});
}
- @Override public int compareTo(GRPCRemoteClient o) {
- return this.client.toString().compareTo(o.client.toString());
+ @Override public void close() {
+ if (Objects.nonNull(this.carrier)) {
+ this.carrier.shutdownConsumers();
+ }
+ if (Objects.nonNull(this.client)) {
+ this.client.shutdown();
+ }
}
- public String getHost() {
- return client.getHost();
+ @Override public Address getAddress() {
+ return address;
}
- public int getPort() {
- return client.getPort();
+ @Override public int compareTo(RemoteClient o) {
+ return address.compareTo(o.getAddress());
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
index 8172a28..ccc216f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -23,11 +23,13 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
/**
* @author peng-yongsheng
*/
-public interface RemoteClient {
+public interface RemoteClient extends Comparable<RemoteClient> {
- String getHost();
+ Address getAddress();
- int getPort();
+ void connect();
+
+ void close();
void push(int nextWorkerId, StreamData streamData);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 129b517..7d78be0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -27,37 +27,61 @@ import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
/**
+ * This class manages the connections between OAP servers. There is a task schedule that will
+ * automatically query a server list from the cluster module. Such as Zookeeper cluster module
+ * or Kubernetes cluster module.
+ *
* @author peng-yongsheng
*/
public class RemoteClientManager implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
- private final ModuleManager moduleManager;
+ private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
private volatile List<RemoteClient> usingClients;
- public RemoteClientManager(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
+ public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
+ this.moduleDefineHolder = moduleDefineHolder;
this.clientsA = new LinkedList<>();
this.clientsB = new LinkedList<>();
this.usingClients = clientsA;
}
public void start() {
- this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
- this.streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
- Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 5, 5, TimeUnit.SECONDS);
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 5, TimeUnit.SECONDS);
}
- private void refresh() {
- if (logger.isDebugEnabled()) {
- logger.debug("Refresh remote nodes collection.");
- }
+ /**
+ * Query OAP server list from the cluster module and create a new connection
+ * for the new node. Make the OAP server orderly because of each of the server
+ * will send stream data to each other by hash code.
+ */
+ void refresh() {
try {
+ if (Objects.isNull(clusterNodesQuery)) {
+ synchronized (RemoteClientManager.class) {
+ if (Objects.isNull(clusterNodesQuery)) {
+ this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
+ }
+ }
+ }
+
+ if (Objects.isNull(streamDataClassGetter)) {
+ synchronized (RemoteClientManager.class) {
+ if (Objects.isNull(streamDataClassGetter)) {
+ this.streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
+ }
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Refresh remote nodes collection.");
+ }
+
List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
Collections.sort(instanceList);
@@ -66,12 +90,11 @@ public class RemoteClientManager implements Service {
}
if (!compare(instanceList)) {
- buildNewClients(instanceList);
+ reBuildRemoteClients(instanceList);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
-
}
public List<RemoteClient> getRemoteClient() {
@@ -94,34 +117,68 @@ public class RemoteClientManager implements Service {
}
}
- private void buildNewClients(List<RemoteInstance> remoteInstances) {
+ /**
+ * Compare clients between exist clients and remote instance collection. Move
+ * the clients into new client collection which are alive to avoid create a
+ * new channel. Shutdown the clients which could not find in cluster config.
+ *
+ * Create a gRPC client for remote instance except for self-instance.
+ *
+ * @param remoteInstances Remote instance collection by query cluster config.
+ */
+ private synchronized void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
getFreeClients().clear();
- Map<String, RemoteClient> currentClientsMap = new HashMap<>();
- this.usingClients.forEach(remoteClient -> currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient));
+ Map<Address, RemoteClient> remoteClients = new HashMap<>();
+ getRemoteClient().forEach(client -> remoteClients.put(client.getAddress(), client));
+
+ Map<Address, Action> tempRemoteClients = new HashMap<>();
+ getRemoteClient().forEach(client -> tempRemoteClients.put(client.getAddress(), Action.Close));
remoteInstances.forEach(remoteInstance -> {
- String address = address(remoteInstance.getHost(), remoteInstance.getPort());
- RemoteClient client;
- if (currentClientsMap.containsKey(address)) {
- client = currentClientsMap.get(address);
+ if (tempRemoteClients.containsKey(remoteInstance.getAddress())) {
+ tempRemoteClients.put(remoteInstance.getAddress(), Action.Leave);
} else {
- if (remoteInstance.isSelf()) {
- client = new SelfRemoteClient(remoteInstance.getHost(), remoteInstance.getPort());
- } else {
- client = new GRPCRemoteClient(streamDataClassGetter, remoteInstance, 1, 3000);
- }
+ tempRemoteClients.put(remoteInstance.getAddress(), Action.Create);
+ }
+ });
+
+ tempRemoteClients.forEach((address, action) -> {
+ switch (action) {
+ case Leave:
+ if (remoteClients.containsKey(address)) {
+ getFreeClients().add(remoteClients.get(address));
+ }
+ break;
+ case Create:
+ if (address.isSelf()) {
+ RemoteClient client = new SelfRemoteClient(address);
+ getFreeClients().add(client);
+ } else {
+ RemoteClient client = new GRPCRemoteClient(streamDataClassGetter, address, 1, 3000);
+ client.connect();
+ getFreeClients().add(client);
+ }
+ break;
}
- getFreeClients().add(client);
});
+ Collections.sort(getFreeClients());
switchCurrentClients();
+
+ tempRemoteClients.forEach((address, action) -> {
+ if (Action.Close.equals(action) && remoteClients.containsKey(address)) {
+ remoteClients.get(address).close();
+ }
+ });
+
+ getFreeClients().clear();
}
private boolean compare(List<RemoteInstance> remoteInstances) {
if (usingClients.size() == remoteInstances.size()) {
for (int i = 0; i < usingClients.size(); i++) {
- if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
+ if (!usingClients.get(i).getAddress().equals(remoteInstances.get(i).getAddress())) {
return false;
}
}
@@ -131,7 +188,7 @@ public class RemoteClientManager implements Service {
}
}
- private String address(String host, int port) {
- return host + String.valueOf(port);
+ enum Action {
+ Close, Leave, Create
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 9b4f6bd..ab25f5c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.remote.client;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
@@ -26,23 +27,28 @@ import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
*/
public class SelfRemoteClient implements RemoteClient {
- private final String host;
- private final int port;
+ private final Address address;
- public SelfRemoteClient(String host, int port) {
- this.host = host;
- this.port = port;
+ public SelfRemoteClient(Address address) {
+ this.address = address;
}
- @Override public String getHost() {
- return host;
+ @Override public Address getAddress() {
+ return address;
}
- @Override public int getPort() {
- return port;
+ @Override public void connect() {
+ }
+
+ @Override public void close() {
+ throw new UnexpectedException("Self remote client invoked to close.");
}
@Override public void push(int nextWorkerId, StreamData streamData) {
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
}
+
+ @Override public int compareTo(RemoteClient o) {
+ return address.compareTo(o.getAddress());
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 861dd7b..ba3de7a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -20,29 +20,20 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import lombok.Setter;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.Downsampling;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -67,8 +58,8 @@ public enum DataTTLKeeperTimer {
private void delete() {
List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
- if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).isSelf()) {
- logger.info("The selected first address is {}. Skip.", remoteInstances.get(0).toString());
+ if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
+ logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
return;
}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
new file mode 100644
index 0000000..b2e1f03
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import io.grpc.inprocess.*;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteServiceHandlerTestCase {
+
+ @Rule
+ public final GrpcCleanupRule gRPCCleanup = new GrpcCleanupRule();
+
+ @Test
+ public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException {
+ final int streamDataClassId = 1;
+ final int testWorkerId = 1;
+
+ ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+ ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+ moduleManager.put(CoreModule.NAME, moduleDefine);
+
+ StreamDataClassGetter classGetter = mock(StreamDataClassGetter.class);
+ Class<?> dataClass = TestRemoteData.class;
+ when(classGetter.findClassById(streamDataClassId)).thenReturn((Class<StreamData>)dataClass);
+
+ moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
+
+ WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker());
+
+ String serverName = InProcessServerBuilder.generateName();
+
+ gRPCCleanup.register(InProcessServerBuilder
+ .forName(serverName).directExecutor().addService(new RemoteServiceHandler(moduleManager)).build().start());
+
+ RemoteServiceGrpc.RemoteServiceStub remoteServiceStub = RemoteServiceGrpc.newStub(
+ gRPCCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
+
+ StreamObserver<RemoteMessage> streamObserver = remoteServiceStub.call(new StreamObserver<Empty>() {
+ @Override public void onNext(Empty empty) {
+
+ }
+
+ @Override public void onError(Throwable throwable) {
+
+ }
+
+ @Override public void onCompleted() {
+
+ }
+ });
+
+ RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
+ remoteMessage.setStreamDataId(streamDataClassId);
+ remoteMessage.setNextWorkerId(testWorkerId);
+
+ RemoteData.Builder remoteData = RemoteData.newBuilder();
+ remoteData.addDataStrings("test1");
+ remoteData.addDataStrings("test2");
+
+ remoteData.addDataLongs(10);
+ remoteData.addDataLongs(20);
+ remoteMessage.setRemoteData(remoteData);
+
+ streamObserver.onNext(remoteMessage.build());
+ streamObserver.onCompleted();
+ }
+
+ static class TestRemoteData extends StreamData {
+
+ private String str1;
+ private String str2;
+ private long long1;
+ private long long2;
+
+ @Override public int remoteHashCode() {
+ return 10;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ str1 = remoteData.getDataStrings(0);
+ str2 = remoteData.getDataStrings(1);
+ long1 = remoteData.getDataLongs(0);
+ long2 = remoteData.getDataLongs(1);
+
+ Assert.assertEquals("test1", str1);
+ Assert.assertEquals("test2", str2);
+ Assert.assertEquals(10, long1);
+ Assert.assertEquals(20, long2);
+ }
+
+ @Override public RemoteData.Builder serialize() {
+ return null;
+ }
+ }
+
+ static class TestWorker extends AbstractWorker {
+
+ public TestWorker() {
+ super(1);
+ }
+
+ @Override public void in(Object o) {
+ TestRemoteData data = (TestRemoteData)o;
+
+ Assert.assertEquals("test1", data.str1);
+ Assert.assertEquals("test2", data.str2);
+ Assert.assertEquals(10, data.long1);
+ Assert.assertEquals(20, data.long2);
+ }
+ }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
new file mode 100644
index 0000000..41953fb
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.junit.Assert;
+
+import static org.mockito.Mockito.spy;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientRealClient {
+
+ public static void main(String[] args) throws InterruptedException {
+ Address address = new Address("localhost", 10000, false);
+ GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(new TestClassGetter(), address, 1, 10));
+ remoteClient.connect();
+
+ for (int i = 0; i < 10000; i++) {
+ remoteClient.push(1, new TestStreamData());
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ TimeUnit.MINUTES.sleep(10);
+ }
+
+ public static class TestClassGetter implements StreamDataClassGetter {
+
+ @Override public int findIdByClass(Class streamDataClass) {
+ return 1;
+ }
+
+ @Override public Class<StreamData> findClassById(int id) {
+ Class<?> clazz = TestStreamData.class;
+ return (Class<StreamData>)clazz;
+ }
+ }
+
+ public static class TestStreamData extends StreamData {
+
+ private long value;
+
+ @Override public int remoteHashCode() {
+ return 0;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ this.value = remoteData.getDataLongs(0);
+ }
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder builder = RemoteData.newBuilder();
+ builder.addDataLongs(987);
+ return builder;
+ }
+ }
+
+ static class TestWorker extends AbstractWorker {
+
+ public TestWorker(int workerId) {
+ super(workerId);
+ }
+
+ @Override public void in(Object o) {
+ TestStreamData streamData = (TestStreamData)o;
+ Assert.assertEquals(987, streamData.value);
+ }
+ }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
new file mode 100644
index 0000000..ae5809c
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.library.server.ServerException;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
+import org.apache.skywalking.oap.server.testing.module.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientRealServer {
+
+ public static void main(String[] args) throws ServerException, InterruptedException {
+ ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+ ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+ moduleManager.put(CoreModule.NAME, moduleDefine);
+
+ moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, new GRPCRemoteClientRealClient.TestClassGetter());
+
+ GRPCServer server = new GRPCServer("localhost", 10000);
+ server.initialize();
+
+ server.addHandler(new RemoteServiceHandler(moduleManager));
+
+ server.start();
+
+ TimeUnit.MINUTES.sleep(10);
+ }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
new file mode 100644
index 0000000..e14f29d
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import io.grpc.testing.GrpcServerRule;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClientTestCase {
+
+ private final int nextWorkerId = 1;
+ private ModuleManagerTesting moduleManager;
+ private StreamDataClassGetter classGetter;
+ @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
+
+ @Before
+ public void before() {
+ moduleManager = new ModuleManagerTesting();
+ ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
+ moduleManager.put(CoreModule.NAME, moduleDefine);
+
+ classGetter = mock(StreamDataClassGetter.class);
+ moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
+
+ TestWorker worker = new TestWorker(nextWorkerId);
+ WorkerInstances.INSTANCES.put(nextWorkerId, worker);
+ }
+
+ @Test
+ public void testPush() throws InterruptedException {
+ grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
+
+ Address address = new Address("not-important", 11, false);
+ GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(classGetter, address, 1, 10));
+ remoteClient.connect();
+
+ doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
+
+ when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
+
+ Class<?> dataClass = TestStreamData.class;
+ when(classGetter.findClassById(1)).thenReturn((Class<StreamData>)dataClass);
+
+ for (int i = 0; i < 12; i++) {
+ remoteClient.push(nextWorkerId, new TestStreamData());
+ }
+
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ public static class TestStreamData extends StreamData {
+
+ private long value;
+
+ @Override public int remoteHashCode() {
+ return 0;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ this.value = remoteData.getDataLongs(0);
+ }
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder builder = RemoteData.newBuilder();
+ builder.addDataLongs(987);
+ return builder;
+ }
+ }
+
+ class TestWorker extends AbstractWorker {
+
+ public TestWorker(int workerId) {
+ super(workerId);
+ }
+
+ @Override public void in(Object o) {
+ TestStreamData streamData = (TestStreamData)o;
+ Assert.assertEquals(987, streamData.value);
+ }
+ }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
new file mode 100644
index 0000000..54b6cb7
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
+import org.apache.skywalking.oap.server.testing.module.*;
+import org.junit.*;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteClientManagerTestCase {
+
+ @Test
+ public void refresh() {
+ ModuleManagerTesting moduleManager = new ModuleManagerTesting();
+ ModuleDefineTesting clusterModuleDefine = new ModuleDefineTesting();
+ moduleManager.put(ClusterModule.NAME, clusterModuleDefine);
+
+ ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
+ moduleManager.put(CoreModule.NAME, coreModuleDefine);
+
+ ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
+ clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery);
+
+ StreamDataClassGetter streamDataClassGetter = mock(StreamDataClassGetter.class);
+ coreModuleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, streamDataClassGetter);
+
+ RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
+
+ when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
+ clientManager.refresh();
+
+ List<RemoteClient> remoteClients = clientManager.getRemoteClient();
+ Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
+ Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
+ Assert.assertEquals("host3", remoteClients.get(2).getAddress().getHost());
+
+ Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
+ Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
+ Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
+
+ when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupTwoInstances());
+ clientManager.refresh();
+
+ remoteClients = clientManager.getRemoteClient();
+ Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
+ Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
+ Assert.assertEquals("host4", remoteClients.get(2).getAddress().getHost());
+ Assert.assertEquals("host5", remoteClients.get(3).getAddress().getHost());
+
+ Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
+ Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
+ Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
+ Assert.assertTrue(remoteClients.get(3) instanceof GRPCRemoteClient);
+ }
+
+ private List<RemoteInstance> groupOneInstances() {
+ List<RemoteInstance> instances = new ArrayList<>();
+ instances.add(new RemoteInstance(new Address("host3", 100, false)));
+ instances.add(new RemoteInstance(new Address("host1", 100, false)));
+ instances.add(new RemoteInstance(new Address("host2", 100, true)));
+ return instances;
+ }
+
+ private List<RemoteInstance> groupTwoInstances() {
+ List<RemoteInstance> instances = new ArrayList<>();
+ instances.add(new RemoteInstance(new Address("host5", 100, false)));
+ instances.add(new RemoteInstance(new Address("host1", 100, false)));
+ instances.add(new RemoteInstance(new Address("host2", 100, true)));
+ instances.add(new RemoteInstance(new Address("host4", 100, false)));
+ return instances;
+ }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
index 5e43021..95af9af 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
@@ -22,7 +22,8 @@ package org.apache.skywalking.oap.server.library.client;
* @author peng-yongsheng
*/
public interface Client {
- void initialize() throws ClientException;
+
+ void connect() throws ClientException;
void shutdown();
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 45df024..f7c4b5c 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -59,7 +59,7 @@ public class ElasticSearchClient implements Client {
this.namespace = namespace;
}
- @Override public void initialize() {
+ @Override public void connect() {
List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
client = new RestHighLevelClient(
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 188fae3..485c447 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -21,12 +21,15 @@ package org.apache.skywalking.oap.server.library.client.grpc;
import io.grpc.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.Client;
+import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class GRPCClient implements Client {
+ private static final Logger logger = LoggerFactory.getLogger(GRPCClient.class);
+
@Getter private final String host;
@Getter private final int port;
@@ -38,12 +41,16 @@ public class GRPCClient implements Client {
this.port = port;
}
- @Override public void initialize() {
+ @Override public void connect() {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
}
@Override public void shutdown() {
- channel.shutdownNow();
+ try {
+ channel.shutdownNow();
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
}
public ManagedChannel getChannel() {
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
index b09bbbf..65a6b18 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
@@ -18,19 +18,12 @@
package org.apache.skywalking.oap.server.library.client.jdbc.hikaricp;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import com.zaxxer.hikari.*;
+import java.sql.*;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.client.Client;
-import org.apache.skywalking.oap.server.library.client.ClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* JDBC Client uses HikariCP connection management lib to execute SQL.
@@ -47,7 +40,7 @@ public class JDBCHikariCPClient implements Client {
hikariConfig = new HikariConfig(properties);
}
- @Override public void initialize() throws ClientException {
+ @Override public void connect() {
dataSource = new HikariDataSource(hikariConfig);
}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
index f696202..28b86df 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
@@ -48,7 +48,7 @@ public class ElasticSearchClientTestCase {
builder.endObject();
ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null);
- client.initialize();
+ client.connect();
String indexName = "test";
client.createIndex(indexName, settings, builder);
diff --git a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
index 04d8675..4c929eb 100644
--- a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
+++ b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/proto/policy/v1beta1/value_type.proto
@@ -43,10 +43,10 @@ enum ValueType {
// A point in time.
TIMESTAMP = 5;
- // An IP address.
+ // An IP getAddress.
IP_ADDRESS = 6;
- // An email address.
+ // An email getAddress.
EMAIL_ADDRESS = 7;
// A URI.
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
index 88dc510..d1a4632 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java
@@ -59,7 +59,7 @@ public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {
String networkAddress = networkAddresses.get(i).getAsString();
if (logger.isDebugEnabled()) {
- logger.debug("network address register, network address: {}", networkAddress);
+ logger.debug("network getAddress register, network getAddress: {}", networkAddress);
}
int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress);
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
index 8aaad19..8fc00a0 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
@@ -92,7 +92,7 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
if (networkAddressId == 0) {
if (logger.isDebugEnabled()) {
- logger.debug("network address: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
+ logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
}
return false;
} else {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
index b02254b..1fbdeb3 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
@@ -99,7 +99,7 @@ public class SpringSleuthSegmentBuilderTest implements SegmentListener {
}
@Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
- String key = "VitualAppCode:" + serviceId + ",address:" + addressId;
+ String key = "VitualAppCode:" + serviceId + ",getAddress:" + addressId;
if (applicationInstRegister.containsKey(key)) {
return applicationInstRegister.get(key);
} else {
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 16219bb..47f0463 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -20,7 +20,7 @@ cluster:
# library the oap-libs folder with your ZooKeeper 3.4.x library.
# zookeeper:
# hostPort: localhost:2181
-# # Retry Policy
+# #Retry Policy
# baseSleepTimeMs: 1000 # initial amount of time to wait between retries
# maxRetries: 3 # max number of times to retry
# kubernetes:
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml
index 0353842..e9a520b 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -34,6 +34,7 @@
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core.alarm.AlarmStandardPersistence" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core" level="INFO"/>
+ <logger name="org.apache.skywalking.oap.server.core.remote.client" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.library.buffer" level="INFO"/>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 40d5416..1099d73 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -88,7 +88,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
public void start() throws ModuleStartException {
try {
nameSpace.setNameSpace(config.getNameSpace());
- elasticSearchClient.initialize();
+ elasticSearchClient.connect();
StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
installer.install(elasticSearchClient);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 8677afd..ac8cb36 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -19,43 +19,13 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import java.util.Properties;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.library.client.ClientException;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
+import org.slf4j.*;
/**
* H2 Storage provider is for demonstration and preview only.
@@ -117,7 +87,7 @@ public class H2StorageProvider extends ModuleProvider {
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
try {
- h2Client.initialize();
+ h2Client.connect();
H2TableInstaller installer = new H2TableInstaller(getManager());
installer.install(h2Client);
@@ -125,8 +95,6 @@ public class H2StorageProvider extends ModuleProvider {
new H2RegisterLockInstaller().install(h2Client);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
- } catch (ClientException e) {
- throw new ModuleStartException(e.getMessage(), e);
}
}