You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/09/07 14:54:39 UTC

[skywalking] branch master updated: Support etcd grouped dynamic configurations. (#7672)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new e89493b  Support etcd grouped dynamic configurations. (#7672)
e89493b is described below

commit e89493b2472211772e5f90d3e0ca327a1ba5736c
Author: wankai123 <wa...@foxmail.com>
AuthorDate: Tue Sep 7 22:54:05 2021 +0800

    Support etcd grouped dynamic configurations. (#7672)
---
 CHANGES.md                                         |  1 +
 docs/en/setup/backend/configuration-vocabulary.md  |  2 +-
 docs/en/setup/backend/dynamic-config-etcd.md       | 47 +++++++++++++++++++-
 .../etcd/EtcdConfigWatcherRegister.java            | 36 +++++++++++++++-
 .../etcd/EtcdConfigurationTestProvider.java        | 46 +++++++++++++++-----
 .../etcd/ITEtcdConfigurationTest.java              | 50 ++++++++++++++++++++++
 .../src/main/resources/application.yml             |  2 +-
 7 files changed, 168 insertions(+), 16 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index eaa0f2d..305e8b5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -57,6 +57,7 @@ Release Notes.
 * Fix `H2MetadataQueryDAO.searchService` doesn't support auto grouping.
 * Rebuilt ElasticSearch client on top of their REST API.
 * Fix ElasticSearch storage plugin doesn't work when hot reloading from `secretsManagementFile`.
+* Support etcd grouped dynamic configurations.
 
 #### UI
 
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 4a71670..d857d3e 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -241,7 +241,7 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | baseSleepTimeMs|The period of Zookeeper client between two retries (in milliseconds). |SW_CONFIG_ZK_BASE_SLEEP_TIME_MS|1000|
 | - | - | maxRetries| The maximum retry time. |SW_CONFIG_ZK_MAX_RETRIES|3|
 | - | - | period | The period of data sync (in seconds). | SW_CONFIG_ZK_PERIOD | 60 |
-| - | etcd| endpoints | Hosts and ports for etcd cluster (separated by commas if multiple). | SW_CONFIG_ETCD_ENDPOINTS | localhost:2379 | 
+| - | etcd| endpoints | Hosts and ports for etcd cluster (separated by commas if multiple). | SW_CONFIG_ETCD_ENDPOINTS | http://localhost:2379 | 
 | - | - | namespace | Namespace for SkyWalking cluster. |SW_CONFIG_ETCD_NAMESPACE | /skywalking |
 | - | - | authentication | Indicates whether there is authentication. | SW_CONFIG_ETCD_AUTHENTICATION | false |
 | - | - | user | Etcd auth username. | SW_CONFIG_ETCD_USER | |
diff --git a/docs/en/setup/backend/dynamic-config-etcd.md b/docs/en/setup/backend/dynamic-config-etcd.md
index 0ea84ed..f1d9478 100755
--- a/docs/en/setup/backend/dynamic-config-etcd.md
+++ b/docs/en/setup/backend/dynamic-config-etcd.md
@@ -7,11 +7,54 @@ configuration:
   selector: ${SW_CONFIGURATION:etcd}
   etcd:
     period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default fetch every 60 seconds.
-    endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
+    endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:http://localhost:2379}
     namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
     authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
     user: ${SW_CONFIG_ETCD_USER:}
     password: ${SW_CONFIG_ETCD_password:}
 ```
 
-**NOTE**: Only the v3 protocol is supported since 8.7.0.
\ No newline at end of file
+**NOTE**: Only the v3 protocol is supported since 8.7.0.
+
+## Config Storage
+### Single Config
+Single configs in etcd are key/value pairs:
+
+| Key | Value |
+|-----|-----|
+| {namespace}/configKey | configVaule |
+
+e.g. The config is:
+```
+{agent-analyzer.default.slowDBAccessThreshold}:{default:200,mongodb:50}
+```
+If `namespace = /skywalking` the config in etcd is:
+
+| Key | Value |
+|-----|-----|
+| /skywalking/agent-analyzer.default.slowDBAccessThreshold | default:200,mongodb:50 |
+| ... | ... |
+
+
+### Group Config
+Group config in etcd are key/value pairs as well and the key is composited by configKey and subItemKey with `/`.
+
+| Key | Value |
+|-----|-----|
+| {namespace}/configKey/subItemkey1 | subItemValue1 |
+| {namespace}/configKey/subItemkey2 | subItemValue2 |
+| ... | ... |
+
+e.g. The config is:
+```
+{core.default.endpoint-name-grouping-openapi}:|{customerAPI-v1}:{value of customerAPI-v1}
+                                              |{productAPI-v1}:{value of productAPI-v1}
+                                              |{productAPI-v2}:{value of productAPI-v2}
+```
+If `namespace = /skywalking` the config in etcd is:
+
+| Key | Value |
+|-----|-----|
+| /skywalking/core.default.endpoint-name-grouping-openapi/customerAPI-v1 | value of customerAPI-v1 |
+| /skywalking/core.default.endpoint-name-grouping-openapi/productAPI-v1 | value of productAPI-v1 |
+| /skywalking/core.default.endpoint-name-grouping-openapi/productAPI-v2 | value of productAPI-v2 |
\ No newline at end of file
diff --git a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
index e0341c7..1f911dc 100644
--- a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
@@ -21,8 +21,11 @@ import io.etcd.jetcd.ByteSequence;
 import io.etcd.jetcd.Client;
 import io.etcd.jetcd.ClientBuilder;
 import io.etcd.jetcd.KV;
+import io.etcd.jetcd.KeyValue;
 import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.options.GetOption;
 import java.nio.charset.Charset;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import lombok.extern.slf4j.Slf4j;
@@ -77,8 +80,37 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
 
     @Override
     public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
-        // TODO: implement readGroupConfig
-        return Optional.empty();
+        GroupConfigTable groupConfigTable = new GroupConfigTable();
+        keys.forEach(key -> {
+            GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(key);
+            groupConfigTable.addGroupConfigItems(groupConfigItems);
+            String groupKey = key + "/";
+
+            GetOption option = GetOption.newBuilder()
+                                        .withPrefix(ByteSequence.from(groupKey, Charset.defaultCharset()))
+                                        .build();
+            try {
+                GetResponse response = client.get(ByteSequence.from(groupKey, Charset.defaultCharset()), option).get();
+                if (0 != response.getCount()) {
+                    List<KeyValue> groupItemKeys = response.getKvs();
+                    if (groupItemKeys != null) {
+                        groupItemKeys.forEach(groupItem -> {
+                            String groupItemKey = groupItem.getKey().toString(Charset.defaultCharset());
+                            if (!groupKey.equals(groupItemKey)) {
+                                String itemValue = groupItem.getValue().toString(Charset.defaultCharset());
+                                String itemName = groupItemKey.substring(groupKey.length());
+                                groupConfigItems.add(
+                                    new ConfigTable.ConfigItem(itemName, itemValue));
+                            }
+                        });
+                    }
+                }
+            } catch (Exception exp) {
+                throw new EtcdConfigException("Failed to read configuration", exp);
+            }
+
+        });
+        return Optional.of(groupConfigTable);
     }
 
 }
diff --git a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
index b938c5d..21d7f6e 100644
--- a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
+++ b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
@@ -18,22 +18,22 @@
 
 package org.apache.skywalking.oap.server.configuration.etcd;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
 import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
 import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
+import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class EtcdConfigurationTestProvider extends ModuleProvider {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(EtcdConfigurationTestProvider.class);
-
     ConfigChangeWatcher watcher;
+    GroupConfigChangeWatcher groupWatcher;
 
     @Override
     public String name() {
@@ -52,13 +52,13 @@ public class EtcdConfigurationTestProvider extends ModuleProvider {
     }
 
     @Override
-    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+    public void prepare() throws ServiceNotProvidedException {
         watcher = new ConfigChangeWatcher(EtcdConfigurationTestModule.NAME, this, "testKey") {
             private volatile String testValue;
 
             @Override
             public void notify(ConfigChangeEvent value) {
-                LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
+                log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
                 if (EventType.DELETE.equals(value.getEventType())) {
                     testValue = null;
                 } else {
@@ -71,18 +71,44 @@ public class EtcdConfigurationTestProvider extends ModuleProvider {
                 return testValue;
             }
         };
+
+        groupWatcher = new GroupConfigChangeWatcher(EtcdConfigurationTestModule.NAME, this, "testKeyGroup") {
+            private Map<String, String> config = new ConcurrentHashMap<>();
+
+            @Override
+            public void notifyGroup(Map<String, ConfigChangeEvent> groupItems) {
+                log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems);
+                groupItems.forEach((groupItemName, event) -> {
+                    if (EventType.DELETE.equals(event.getEventType())) {
+                        config.remove(groupItemName);
+                    } else {
+                        config.put(groupItemName, event.getNewValue());
+                    }
+                });
+            }
+
+            @Override
+            public Map<String, String> groupItems() {
+                return config;
+            }
+        };
     }
 
     @Override
-    public void start() throws ServiceNotProvidedException, ModuleStartException {
+    public void start() throws ServiceNotProvidedException {
         getManager().find(ConfigurationModule.NAME)
                     .provider()
                     .getService(DynamicConfigurationService.class)
                     .registerConfigChangeWatcher(watcher);
+
+        getManager().find(ConfigurationModule.NAME)
+                    .provider()
+                    .getService(DynamicConfigurationService.class)
+                    .registerConfigChangeWatcher(groupWatcher);
     }
 
     @Override
-    public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+    public void notifyAfterCompleted() throws ServiceNotProvidedException {
 
     }
 
diff --git a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
index acbc77a..a4ddbe4 100644
--- a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
+++ b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
@@ -108,6 +108,56 @@ public class ITEtcdConfigurationTest {
         assertNull(provider.watcher.value());
     }
 
+    @Test(timeout = 20000)
+    public void shouldReadUpdated4Group() throws Exception {
+        assertEquals("{}", provider.groupWatcher.groupItems().toString());
+
+        KV client = Client.builder()
+                          .endpoints("http://localhost:" + container.getMappedPort(2379))
+                          .namespace(ByteSequence.from("/skywalking/", Charset.defaultCharset()))
+                          .build()
+                          .getKVClient();
+
+        client.put(
+            ByteSequence.from("test-module.default.testKeyGroup/item1", Charset.defaultCharset()),
+            ByteSequence.from("100", Charset.defaultCharset())
+        ).get();
+        client.put(
+            ByteSequence.from("test-module.default.testKeyGroup/item2", Charset.defaultCharset()),
+            ByteSequence.from("200", Charset.defaultCharset())
+        ).get();
+
+        for (String v = provider.groupWatcher.groupItems().get("item1"); v == null; v = provider.groupWatcher.groupItems().get("item1")) {
+            log.info("value is : {}", provider.groupWatcher.groupItems().get("item1"));
+            TimeUnit.MILLISECONDS.sleep(200L);
+        }
+        for (String v = provider.groupWatcher.groupItems().get("item2"); v == null; v = provider.groupWatcher.groupItems().get("item2")) {
+            log.info("value is : {}", provider.groupWatcher.groupItems().get("item2"));
+            TimeUnit.MILLISECONDS.sleep(200L);
+        }
+        assertEquals("100", provider.groupWatcher.groupItems().get("item1"));
+        assertEquals("200", provider.groupWatcher.groupItems().get("item2"));
+
+        //test remove item1
+        client.delete(ByteSequence.from("test-module.default.testKeyGroup/item1", Charset.defaultCharset())).get();
+        for (String v = provider.groupWatcher.groupItems().get("item1"); v != null; v = provider.groupWatcher.groupItems().get("item1")) {
+            log.info("value is : {}", provider.groupWatcher.groupItems().get("item1"));
+            TimeUnit.MILLISECONDS.sleep(200L);
+        }
+        assertNull(provider.groupWatcher.groupItems().get("item1"));
+
+        //test modify item2
+        client.put(
+            ByteSequence.from("test-module.default.testKeyGroup/item2", Charset.defaultCharset()),
+            ByteSequence.from("300", Charset.defaultCharset())
+        ).get();
+        for (String v = provider.groupWatcher.groupItems().get("item2"); v.equals("200"); v = provider.groupWatcher.groupItems().get("item2")) {
+            log.info("value is : {}", provider.groupWatcher.groupItems().get("item2"));
+            TimeUnit.MILLISECONDS.sleep(200L);
+        }
+        assertEquals("300", provider.groupWatcher.groupItems().get("item2"));
+    }
+
     @SuppressWarnings("unchecked")
     private static void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
         final Yaml yaml = new Yaml();
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 17b69d8..e71db21 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -432,7 +432,7 @@ configuration:
     maxRetries: ${SW_CONFIG_ZK_MAX_RETRIES:3} # max number of times to retry
   etcd:
     period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default fetch every 60 seconds.
-    endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
+    endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:http://localhost:2379}
     namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
     authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
     user: ${SW_CONFIG_ETCD_USER:}