You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/03/03 15:44:09 UTC

[skywalking] 01/01: Add UUID to DCS

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch dcs-upgrade
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit ad8bc1d3bc4d446e16494236b2a4b59bcd487df6
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Mar 3 23:43:46 2020 +0800

    Add UUID to DCS
---
 .../configuration/api/ConfigWatcherRegister.java   | 62 +++++++++++++---------
 .../api/ConfigWatcherRegisterTest.java             |  5 +-
 .../apollo/ApolloConfigWatcherRegister.java        |  5 +-
 .../consul/ConsulConfigurationWatcherRegister.java |  4 +-
 .../etcd/EtcdConfigWatcherRegister.java            |  4 +-
 .../nacos/NacosConfigWatcherRegister.java          |  4 +-
 .../zookeeper/ZookeeperConfigWatcherRegister.java  |  5 +-
 .../ut/MockZookeeperConfigWatcherRegister.java     |  5 +-
 .../grpc/GRPCConfigWatcherRegister.java            | 28 +++++++---
 .../src/main/proto/configuration-service.proto     |  3 ++
 .../core/analysis/ApdexThresholdConfigTest.java    |  5 +-
 11 files changed, 81 insertions(+), 49 deletions(-)

diff --git a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java
index 8460970..c4a436d 100644
--- a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.configuration.api;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -67,41 +68,52 @@ public abstract class ConfigWatcherRegister implements DynamicConfigurationServi
         logger.info("Current configurations after the bootstrap sync." + LINE_SEPARATOR + register.toString());
 
         Executors.newSingleThreadScheduledExecutor()
-                 .scheduleAtFixedRate(new RunnableWithExceptionProtection(this::configSync, t -> logger.error("Sync config center error.", t)), syncPeriod, syncPeriod, TimeUnit.SECONDS);
+                 .scheduleAtFixedRate(
+                     new RunnableWithExceptionProtection(
+                         this::configSync,
+                         t -> logger.error("Sync config center error.", t)
+                     ), syncPeriod, syncPeriod, TimeUnit.SECONDS);
     }
 
     void configSync() {
-        ConfigTable configTable = readConfig(register.keys());
-
-        configTable.getItems().forEach(item -> {
-            String itemName = item.getName();
-            WatcherHolder holder = register.get(itemName);
-            if (holder != null) {
-                ConfigChangeWatcher watcher = holder.getWatcher();
-                String newItemValue = item.getValue();
-                if (newItemValue == null) {
-                    if (watcher.value() != null) {
-                        // Notify watcher, the new value is null with delete event type.
-                        watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
+        Optional<ConfigTable> configTable = readConfig(register.keys());
+
+        // Config table would be null if no change detected from the implementation.
+        configTable.ifPresent(config -> {
+            config.getItems().forEach(item -> {
+                String itemName = item.getName();
+                WatcherHolder holder = register.get(itemName);
+                if (holder != null) {
+                    ConfigChangeWatcher watcher = holder.getWatcher();
+                    String newItemValue = item.getValue();
+                    if (newItemValue == null) {
+                        if (watcher.value() != null) {
+                            // Notify watcher, the new value is null with delete event type.
+                            watcher.notify(
+                                new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
+                        } else {
+                            // Don't need to notify, stay in null.
+                        }
                     } else {
-                        // Don't need to notify, stay in null.
+                        if (!newItemValue.equals(watcher.value())) {
+                            watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
+                                newItemValue,
+                                ConfigChangeWatcher.EventType.MODIFY
+                            ));
+                        } else {
+                            // Don't need to notify, stay in the same config value.
+                        }
                     }
                 } else {
-                    if (!newItemValue.equals(watcher.value())) {
-                        watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(newItemValue, ConfigChangeWatcher.EventType.MODIFY));
-                    } else {
-                        // Don't need to notify, stay in the same config value.
-                    }
+                    logger.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName);
                 }
-            } else {
-                logger.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName);
-            }
-        });
+            });
 
-        logger.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString());
+            logger.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString());
+        });
     }
 
-    public abstract ConfigTable readConfig(Set<String> keys);
+    public abstract Optional<ConfigTable> readConfig(Set<String> keys);
 
     public class Register {
         private Map<String, WatcherHolder> register = new HashMap<>();
diff --git a/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java b/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java
index 7020151..944ab3c 100644
--- a/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java
+++ b/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.configuration.api;
 
+import java.util.Optional;
 import java.util.Set;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -88,14 +89,14 @@ public class ConfigWatcherRegisterTest {
     public static class MockConfigWatcherRegister extends ConfigWatcherRegister {
 
         @Override
-        public ConfigTable readConfig(Set<String> keys) {
+        public Optional<ConfigTable> readConfig(Set<String> keys) {
             ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("module.provider.prop1", "abc");
             ConfigTable.ConfigItem item2 = new ConfigTable.ConfigItem("MockModule.provider.prop2", "abc2");
 
             ConfigTable table = new ConfigTable();
             table.add(item1);
             table.add(item2);
-            return table;
+            return Optional.of(table);
         }
     }
 
diff --git a/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java b/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java
index 6600a69..60d9319 100644
--- a/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.configuration.apollo;
 import com.ctrip.framework.apollo.Config;
 import com.ctrip.framework.apollo.ConfigService;
 import com.google.common.base.Strings;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
 import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
@@ -53,7 +54,7 @@ public class ApolloConfigWatcherRegister extends ConfigWatcherRegister {
     }
 
     @Override
-    public ConfigTable readConfig(Set<String> keys) {
+    public Optional<ConfigTable> readConfig(Set<String> keys) {
         final ConfigTable configTable = new ConfigTable();
 
         for (final String name : keys) {
@@ -61,6 +62,6 @@ public class ApolloConfigWatcherRegister extends ConfigWatcherRegister {
             configTable.add(new ConfigTable.ConfigItem(name, value));
         }
 
-        return configTable;
+        return Optional.of(configTable);
     }
 }
diff --git a/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java b/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java
index 5df1531..73c3d55 100644
--- a/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java
@@ -76,7 +76,7 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
     }
 
     @Override
-    public ConfigTable readConfig(Set<String> keys) {
+    public Optional<ConfigTable> readConfig(Set<String> keys) {
         removeUninterestedKeys(keys);
 
         registerKeyListeners(keys);
@@ -91,7 +91,7 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
             }
         });
 
-        return table;
+        return Optional.of(table);
     }
 
     private void registerKeyListeners(final Set<String> keys) {
diff --git a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
index 1d70158..aedb375 100644
--- a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
@@ -65,7 +65,7 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
     }
 
     @Override
-    public ConfigTable readConfig(Set<String> keys) {
+    public Optional<ConfigTable> readConfig(Set<String> keys) {
         removeUninterestedKeys(keys);
         registerKeyListeners(keys);
         final ConfigTable table = new ConfigTable();
@@ -81,7 +81,7 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
             }
         }
 
-        return table;
+        return Optional.of(table);
     }
 
     private void registerKeyListeners(final Set<String> keys) {
diff --git a/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java b/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java
index 3a3f2cb..0101cdf 100644
--- a/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java
@@ -60,7 +60,7 @@ public class NacosConfigWatcherRegister extends ConfigWatcherRegister {
     }
 
     @Override
-    public ConfigTable readConfig(Set<String> keys) {
+    public Optional<ConfigTable> readConfig(Set<String> keys) {
         removeUninterestedKeys(keys);
         registerKeyListeners(keys);
 
@@ -77,7 +77,7 @@ public class NacosConfigWatcherRegister extends ConfigWatcherRegister {
             }
         }
 
-        return table;
+        return Optional.of(table);
     }
 
     private void registerKeyListeners(final Set<String> keys) {
diff --git a/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java b/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java
index b0ca079..b8f0d1c 100644
--- a/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.configuration.zookeeper;
 
+import java.util.Optional;
 import java.util.Set;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -43,12 +44,12 @@ public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
     }
 
     @Override
-    public ConfigTable readConfig(Set<String> keys) {
+    public Optional<ConfigTable> readConfig(Set<String> keys) {
         ConfigTable table = new ConfigTable();
         keys.forEach(s -> {
             ChildData data = this.childrenCache.getCurrentData(this.prefix + s);
             table.add(new ConfigTable.ConfigItem(s, data == null ? null : new String(data.getData())));
         });
-        return table;
+        return Optional.of(table);
     }
 }
diff --git a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java
index 4b424af..3cf4197 100644
--- a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.configuration.zookeeper.ut;
 
+import java.util.Optional;
 import java.util.Set;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -35,12 +36,12 @@ public class MockZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
     }
 
     @Override
-    public ConfigTable readConfig(Set<String> keys) {
+    public Optional<ConfigTable> readConfig(Set<String> keys) {
         ConfigTable table = new ConfigTable();
         keys.forEach(s -> {
             ChildData data = this.childrenCache.getCurrentData(this.prefix + s);
             table.add(new ConfigTable.ConfigItem(s, data == null ? null : new String(data.getData())));
         });
-        return table;
+        return Optional.of(table);
     }
 }
\ No newline at end of file
diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java b/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java
index dd92b24..a4df9af 100644
--- a/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java
+++ b/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java
@@ -19,6 +19,8 @@
 package org.apache.skywalking.oap.server.configuration.grpc;
 
 import io.grpc.netty.NettyChannelBuilder;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
 import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
@@ -33,22 +35,32 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
 
     private RemoteEndpointSettings settings;
     private ConfigurationServiceGrpc.ConfigurationServiceBlockingStub stub;
+    private String uuid = null;
 
     public GRPCConfigWatcherRegister(RemoteEndpointSettings settings) {
         super(settings.getPeriod());
         this.settings = settings;
-        stub = ConfigurationServiceGrpc.newBlockingStub(NettyChannelBuilder.forAddress(settings.getHost(), settings.getPort())
-                                                                           .usePlaintext()
-                                                                           .build());
+        stub = ConfigurationServiceGrpc.newBlockingStub(
+            NettyChannelBuilder.forAddress(settings.getHost(), settings.getPort())
+                               .usePlaintext()
+                               .build());
     }
 
     @Override
-    public ConfigTable readConfig(Set<String> keys) {
+    public Optional<ConfigTable> readConfig(Set<String> keys) {
         ConfigTable table = new ConfigTable();
         try {
-            ConfigurationResponse response = stub.call(ConfigurationRequest.newBuilder()
-                                                                           .setClusterName(settings.getClusterName())
-                                                                           .build());
+            ConfigurationRequest.Builder builder = ConfigurationRequest.newBuilder()
+                                                                       .setClusterName(settings.getClusterName());
+            if (uuid != null) {
+                builder.setUuid(uuid);
+            }
+            ConfigurationResponse response = stub.call(builder.build());
+            String response_uuid = response.getUuid();
+            if (Objects.equals(uuid, response_uuid)) {
+                // If UUID matched, the config table is expected as empty.
+                return Optional.empty();
+            }
             response.getConfigTableList().forEach(config -> {
                 final String name = config.getName();
                 if (keys.contains(name)) {
@@ -58,6 +70,6 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
         } catch (Exception e) {
             logger.error("Remote config center [" + settings + "] is not available.", e);
         }
-        return table;
+        return Optional.of(table);
     }
 }
diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto b/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto
index be15aef..4610106 100644
--- a/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto
+++ b/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto
@@ -31,6 +31,8 @@ message ConfigurationRequest {
     // in case the remote configuration center implementation support
     // configuration management for multiple clusters.
     string clusterName = 1;
+    // The config UUID response from the config server side.
+    string uuid = 2;
 }
 
 message ConfigurationResponse {
@@ -42,6 +44,7 @@ message ConfigurationResponse {
     // If the config center wants to set the value to NULL or empty,
     // must set the name with empty value explicitly.
     repeated Config configTable = 1;
+    string uuid = 2;
 }
 
 message Config {
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java
index 8489e8b..4ba736f 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis;
 
+import java.util.Optional;
 import java.util.Set;
 import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
 import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
@@ -68,10 +69,10 @@ public class ApdexThresholdConfigTest {
         }
 
         @Override
-        public ConfigTable readConfig(Set<String> keys) {
+        public Optional<ConfigTable> readConfig(Set<String> keys) {
             ConfigTable table = new ConfigTable();
             table.add(new ConfigTable.ConfigItem("core.default.apdexThreshold", "default: 1000 \nfoo: 200"));
-            return table;
+            return Optional.of(table);
         }
     }
 }
\ No newline at end of file