You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/07/24 02:17:27 UTC

[incubator-skywalking] branch 6.0 updated: Adjust zk cluster management.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/6.0 by this push:
     new a94b542  Adjust zk cluster management.
a94b542 is described below

commit a94b542806ba6e7951a07011702d750b3b5ef9f2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Jul 24 10:17:20 2018 +0800

    Adjust zk cluster management.
---
 .../plugin/standalone/StandaloneManager.java       |  1 +
 .../zookeeper/ClusterModuleZookeeperProvider.java  | 12 ++----
 ...NodeRegister.java => ZookeeperCoordinator.java} | 43 +++++++++++++++++----
 .../plugin/zookeeper/ZookeeperModuleQuery.java     | 45 ----------------------
 .../oap/server/core/cluster/RemoteInstance.java    | 26 +++++++++++++
 5 files changed, 66 insertions(+), 61 deletions(-)

diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
index 3535e2b..6cba94f 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java
@@ -35,6 +35,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
 
     @Override public void registerRemote(RemoteInstance remoteInstance) {
         this.remoteInstance = remoteInstance;
+        this.remoteInstance.setSelf(true);
     }
 
     @Override
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java
index 0cfa3d5..81ea673 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java
@@ -22,7 +22,6 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
@@ -77,23 +76,18 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
             .watchInstances(true)
             .serializer(new SWInstanceSerializer()).build();
 
-        String remoteName = "remote";
-        ServiceCache<RemoteInstance> serviceCache = serviceDiscovery.serviceCacheBuilder()
-            .name(remoteName)
-            .build();
         try {
             client.start();
             client.blockUntilConnected();
             serviceDiscovery.start();
-
-            serviceCache.start();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        this.registerServiceImplementation(ClusterRegister.class, new ZookeeperNodeRegister(serviceDiscovery, remoteName));
-        this.registerServiceImplementation(ClusterNodesQuery.class, new ZookeeperModuleQuery(serviceCache));
+        ZookeeperCoordinator coordinator = new ZookeeperCoordinator(serviceDiscovery);
+        this.registerServiceImplementation(ClusterRegister.class, coordinator);
+        this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
     }
 
     @Override public void start() {
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
similarity index 59%
rename from oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java
rename to oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
index ba6b518..8cd8b59 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java
@@ -18,9 +18,13 @@
 
 package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
+import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
 import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
@@ -30,21 +34,23 @@ import org.slf4j.LoggerFactory;
 /**
  * @author peng-yongsheng
  */
-public class ZookeeperNodeRegister implements ClusterRegister {
-    private static final Logger logger = LoggerFactory.getLogger(ZookeeperNodeRegister.class);
+public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery {
+    private static final Logger logger = LoggerFactory.getLogger(ZookeeperCoordinator.class);
 
     private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
-    private final String nodeName;
+    private volatile ServiceCache<RemoteInstance> serviceCache;
+    private volatile RemoteInstance selfInstance;
 
-    ZookeeperNodeRegister(ServiceDiscovery<RemoteInstance> serviceDiscovery, String nodeName) {
+    ZookeeperCoordinator(ServiceDiscovery<RemoteInstance> serviceDiscovery) {
         this.serviceDiscovery = serviceDiscovery;
-        this.nodeName = nodeName;
     }
 
     @Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
         try {
+            String remoteNamePath = "remote";
+
             ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
-                .name(nodeName)
+                .name(remoteNamePath)
                 .id(UUID.randomUUID().toString())
                 .address(remoteInstance.getHost())
                 .port(remoteInstance.getPort())
@@ -52,9 +58,32 @@ public class ZookeeperNodeRegister implements ClusterRegister {
                 .build();
 
             serviceDiscovery.registerService(thisInstance);
+
+            serviceCache = serviceDiscovery.serviceCacheBuilder()
+                .name(remoteNamePath)
+                .build();
+
+            serviceCache.start();
+
+            this.selfInstance = remoteInstance;
         } catch (Exception e) {
-            logger.error(e.getMessage(), e);
             throw new ServiceRegisterException(e.getMessage());
         }
     }
+
+    @Override public List<RemoteInstance> queryRemoteNodes() {
+        List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
+
+        List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(serviceInstances.size());
+        serviceInstances.forEach(serviceInstance -> {
+            RemoteInstance instance = serviceInstance.getPayload();
+            if (instance.equals(selfInstance)) {
+                instance.setSelf(true);
+            } else {
+                instance.setSelf(false);
+            }
+            remoteInstanceDetails.add(instance);
+        });
+        return remoteInstanceDetails;
+    }
 }
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java
deleted file mode 100644
index 7a41531..0000000
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
-
-import java.util.*;
-import org.apache.curator.x.discovery.ServiceCache;
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.skywalking.oap.server.core.cluster.*;
-
-/**
- * @author peng-yongsheng, Wu Sheng
- */
-public class ZookeeperModuleQuery implements ClusterNodesQuery {
-
-    private final ServiceCache<RemoteInstance> serviceCache;
-
-    ZookeeperModuleQuery(ServiceCache<RemoteInstance> serviceCache) {
-        this.serviceCache = serviceCache;
-    }
-
-    @Override
-    public List<RemoteInstance> queryRemoteNodes() throws ServiceRegisterException {
-        List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
-
-        List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(serviceInstances.size());
-        serviceInstances.forEach(serviceInstance -> remoteInstanceDetails.add(serviceInstance.getPayload()));
-        return remoteInstanceDetails;
-    }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 14018e1..2c8d761 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -18,6 +18,8 @@
 
 package org.apache.skywalking.oap.server.core.cluster;
 
+import java.util.Objects;
+
 /**
  * @author peng-yongsheng
  */
@@ -25,6 +27,7 @@ public class RemoteInstance {
 
     private String host;
     private int port;
+    private boolean self = false;
 
     public String getHost() {
         return host;
@@ -41,4 +44,27 @@ public class RemoteInstance {
     public void setPort(int port) {
         this.port = port;
     }
+
+    public boolean isSelf() {
+        return self;
+    }
+
+    public void setSelf(boolean self) {
+        this.self = self;
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        RemoteInstance instance = (RemoteInstance)o;
+        return port == instance.port &&
+            Objects.equals(host, instance.host);
+    }
+
+    @Override public int hashCode() {
+
+        return Objects.hash(host, port);
+    }
 }