You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/07/24 02:17:27 UTC
[incubator-skywalking] branch 6.0 updated: Adjust zk cluster
management.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/6.0 by this push:
new a94b542 Adjust zk cluster management.
a94b542 is described below
commit a94b542806ba6e7951a07011702d750b3b5ef9f2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Jul 24 10:17:20 2018 +0800
Adjust zk cluster management.
---
.../plugin/standalone/StandaloneManager.java | 1 +
.../zookeeper/ClusterModuleZookeeperProvider.java | 12 ++----
...NodeRegister.java => ZookeeperCoordinator.java} | 43 +++++++++++++++++----
.../plugin/zookeeper/ZookeeperModuleQuery.java | 45 ----------------------
.../oap/server/core/cluster/RemoteInstance.java | 26 +++++++++++++
5 files changed, 66 insertions(+), 61 deletions(-)
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
index 3535e2b..6cba94f 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
@@ -35,6 +35,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
@Override public void registerRemote(RemoteInstance remoteInstance) {
this.remoteInstance = remoteInstance;
+ this.remoteInstance.setSelf(true);
}
@Override
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java
index 0cfa3d5..81ea673 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java
@@ -22,7 +22,6 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
@@ -77,23 +76,18 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
.watchInstances(true)
.serializer(new SWInstanceSerializer()).build();
- String remoteName = "remote";
- ServiceCache<RemoteInstance> serviceCache = serviceDiscovery.serviceCacheBuilder()
- .name(remoteName)
- .build();
try {
client.start();
client.blockUntilConnected();
serviceDiscovery.start();
-
- serviceCache.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e);
}
- this.registerServiceImplementation(ClusterRegister.class, new ZookeeperNodeRegister(serviceDiscovery, remoteName));
- this.registerServiceImplementation(ClusterNodesQuery.class, new ZookeeperModuleQuery(serviceCache));
+ ZookeeperCoordinator coordinator = new ZookeeperCoordinator(serviceDiscovery);
+ this.registerServiceImplementation(ClusterRegister.class, coordinator);
+ this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override public void start() {
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
similarity index 59%
rename from oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java
rename to oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
index ba6b518..8cd8b59 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
@@ -18,9 +18,13 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
+import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
@@ -30,21 +34,23 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
-public class ZookeeperNodeRegister implements ClusterRegister {
- private static final Logger logger = LoggerFactory.getLogger(ZookeeperNodeRegister.class);
+public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery {
+ private static final Logger logger = LoggerFactory.getLogger(ZookeeperCoordinator.class);
private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
- private final String nodeName;
+ private volatile ServiceCache<RemoteInstance> serviceCache;
+ private volatile RemoteInstance selfInstance;
- ZookeeperNodeRegister(ServiceDiscovery<RemoteInstance> serviceDiscovery, String nodeName) {
+ ZookeeperCoordinator(ServiceDiscovery<RemoteInstance> serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
- this.nodeName = nodeName;
}
@Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
try {
+ String remoteNamePath = "remote";
+
ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
- .name(nodeName)
+ .name(remoteNamePath)
.id(UUID.randomUUID().toString())
.address(remoteInstance.getHost())
.port(remoteInstance.getPort())
@@ -52,9 +58,32 @@ public class ZookeeperNodeRegister implements ClusterRegister {
.build();
serviceDiscovery.registerService(thisInstance);
+
+ serviceCache = serviceDiscovery.serviceCacheBuilder()
+ .name(remoteNamePath)
+ .build();
+
+ serviceCache.start();
+
+ this.selfInstance = remoteInstance;
} catch (Exception e) {
- logger.error(e.getMessage(), e);
throw new ServiceRegisterException(e.getMessage());
}
}
+
+ @Override public List<RemoteInstance> queryRemoteNodes() {
+ List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
+
+ List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(serviceInstances.size());
+ serviceInstances.forEach(serviceInstance -> {
+ RemoteInstance instance = serviceInstance.getPayload();
+ if (instance.equals(selfInstance)) {
+ instance.setSelf(true);
+ } else {
+ instance.setSelf(false);
+ }
+ remoteInstanceDetails.add(instance);
+ });
+ return remoteInstanceDetails;
+ }
}
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java
deleted file mode 100644
index 7a41531..0000000
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
-
-import java.util.*;
-import org.apache.curator.x.discovery.ServiceCache;
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.skywalking.oap.server.core.cluster.*;
-
-/**
- * @author peng-yongsheng, Wu Sheng
- */
-public class ZookeeperModuleQuery implements ClusterNodesQuery {
-
- private final ServiceCache<RemoteInstance> serviceCache;
-
- ZookeeperModuleQuery(ServiceCache<RemoteInstance> serviceCache) {
- this.serviceCache = serviceCache;
- }
-
- @Override
- public List<RemoteInstance> queryRemoteNodes() throws ServiceRegisterException {
- List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
-
- List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(serviceInstances.size());
- serviceInstances.forEach(serviceInstance -> remoteInstanceDetails.add(serviceInstance.getPayload()));
- return remoteInstanceDetails;
- }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 14018e1..2c8d761 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.core.cluster;
+import java.util.Objects;
+
/**
* @author peng-yongsheng
*/
@@ -25,6 +27,7 @@ public class RemoteInstance {
private String host;
private int port;
+ private boolean self = false;
public String getHost() {
return host;
@@ -41,4 +44,27 @@ public class RemoteInstance {
public void setPort(int port) {
this.port = port;
}
+
+ public boolean isSelf() {
+ return self;
+ }
+
+ public void setSelf(boolean self) {
+ this.self = self;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ RemoteInstance instance = (RemoteInstance)o;
+ return port == instance.port &&
+ Objects.equals(host, instance.host);
+ }
+
+ @Override public int hashCode() {
+
+ return Objects.hash(host, port);
+ }
}