You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/09/07 14:54:39 UTC
[skywalking] branch master updated: Support etcd grouped dynamic
configurations. (#7672)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new e89493b Support etcd grouped dynamic configurations. (#7672)
e89493b is described below
commit e89493b2472211772e5f90d3e0ca327a1ba5736c
Author: wankai123 <wa...@foxmail.com>
AuthorDate: Tue Sep 7 22:54:05 2021 +0800
Support etcd grouped dynamic configurations. (#7672)
---
CHANGES.md | 1 +
docs/en/setup/backend/configuration-vocabulary.md | 2 +-
docs/en/setup/backend/dynamic-config-etcd.md | 47 +++++++++++++++++++-
.../etcd/EtcdConfigWatcherRegister.java | 36 +++++++++++++++-
.../etcd/EtcdConfigurationTestProvider.java | 46 +++++++++++++++-----
.../etcd/ITEtcdConfigurationTest.java | 50 ++++++++++++++++++++++
.../src/main/resources/application.yml | 2 +-
7 files changed, 168 insertions(+), 16 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index eaa0f2d..305e8b5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -57,6 +57,7 @@ Release Notes.
* Fix `H2MetadataQueryDAO.searchService` doesn't support auto grouping.
* Rebuilt ElasticSearch client on top of their REST API.
* Fix ElasticSearch storage plugin doesn't work when hot reloading from `secretsManagementFile`.
+* Support etcd grouped dynamic configurations.
#### UI
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 4a71670..d857d3e 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -241,7 +241,7 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | baseSleepTimeMs|The period of Zookeeper client between two retries (in milliseconds). |SW_CONFIG_ZK_BASE_SLEEP_TIME_MS|1000|
| - | - | maxRetries| The maximum retry time. |SW_CONFIG_ZK_MAX_RETRIES|3|
| - | - | period | The period of data sync (in seconds). | SW_CONFIG_ZK_PERIOD | 60 |
-| - | etcd| endpoints | Hosts and ports for etcd cluster (separated by commas if multiple). | SW_CONFIG_ETCD_ENDPOINTS | localhost:2379 |
+| - | etcd| endpoints | Hosts and ports for etcd cluster (separated by commas if multiple). | SW_CONFIG_ETCD_ENDPOINTS | http://localhost:2379 |
| - | - | namespace | Namespace for SkyWalking cluster. |SW_CONFIG_ETCD_NAMESPACE | /skywalking |
| - | - | authentication | Indicates whether there is authentication. | SW_CONFIG_ETCD_AUTHENTICATION | false |
| - | - | user | Etcd auth username. | SW_CONFIG_ETCD_USER | |
diff --git a/docs/en/setup/backend/dynamic-config-etcd.md b/docs/en/setup/backend/dynamic-config-etcd.md
index 0ea84ed..f1d9478 100755
--- a/docs/en/setup/backend/dynamic-config-etcd.md
+++ b/docs/en/setup/backend/dynamic-config-etcd.md
@@ -7,11 +7,54 @@ configuration:
selector: ${SW_CONFIGURATION:etcd}
etcd:
period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default fetch every 60 seconds.
- endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
+ endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:http://localhost:2379}
namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
user: ${SW_CONFIG_ETCD_USER:}
password: ${SW_CONFIG_ETCD_password:}
```
-**NOTE**: Only the v3 protocol is supported since 8.7.0.
\ No newline at end of file
+**NOTE**: Only the v3 protocol is supported since 8.7.0.
+
+## Config Storage
+### Single Config
+Single configs in etcd are key/value pairs:
+
+| Key | Value |
+|-----|-----|
+| {namespace}/configKey | configVaule |
+
+e.g. The config is:
+```
+{agent-analyzer.default.slowDBAccessThreshold}:{default:200,mongodb:50}
+```
+If `namespace = /skywalking` the config in etcd is:
+
+| Key | Value |
+|-----|-----|
+| /skywalking/agent-analyzer.default.slowDBAccessThreshold | default:200,mongodb:50 |
+| ... | ... |
+
+
+### Group Config
+Group config in etcd are key/value pairs as well and the key is composited by configKey and subItemKey with `/`.
+
+| Key | Value |
+|-----|-----|
+| {namespace}/configKey/subItemkey1 | subItemValue1 |
+| {namespace}/configKey/subItemkey2 | subItemValue2 |
+| ... | ... |
+
+e.g. The config is:
+```
+{core.default.endpoint-name-grouping-openapi}:|{customerAPI-v1}:{value of customerAPI-v1}
+ |{productAPI-v1}:{value of productAPI-v1}
+ |{productAPI-v2}:{value of productAPI-v2}
+```
+If `namespace = /skywalking` the config in etcd is:
+
+| Key | Value |
+|-----|-----|
+| /skywalking/core.default.endpoint-name-grouping-openapi/customerAPI-v1 | value of customerAPI-v1 |
+| /skywalking/core.default.endpoint-name-grouping-openapi/productAPI-v1 | value of productAPI-v1 |
+| /skywalking/core.default.endpoint-name-grouping-openapi/productAPI-v2 | value of productAPI-v2 |
\ No newline at end of file
diff --git a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
index e0341c7..1f911dc 100644
--- a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
+++ b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java
@@ -21,8 +21,11 @@ import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.KV;
+import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.options.GetOption;
import java.nio.charset.Charset;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
@@ -77,8 +80,37 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
- // TODO: implement readGroupConfig
- return Optional.empty();
+ GroupConfigTable groupConfigTable = new GroupConfigTable();
+ keys.forEach(key -> {
+ GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(key);
+ groupConfigTable.addGroupConfigItems(groupConfigItems);
+ String groupKey = key + "/";
+
+ GetOption option = GetOption.newBuilder()
+ .withPrefix(ByteSequence.from(groupKey, Charset.defaultCharset()))
+ .build();
+ try {
+ GetResponse response = client.get(ByteSequence.from(groupKey, Charset.defaultCharset()), option).get();
+ if (0 != response.getCount()) {
+ List<KeyValue> groupItemKeys = response.getKvs();
+ if (groupItemKeys != null) {
+ groupItemKeys.forEach(groupItem -> {
+ String groupItemKey = groupItem.getKey().toString(Charset.defaultCharset());
+ if (!groupKey.equals(groupItemKey)) {
+ String itemValue = groupItem.getValue().toString(Charset.defaultCharset());
+ String itemName = groupItemKey.substring(groupKey.length());
+ groupConfigItems.add(
+ new ConfigTable.ConfigItem(itemName, itemValue));
+ }
+ });
+ }
+ }
+ } catch (Exception exp) {
+ throw new EtcdConfigException("Failed to read configuration", exp);
+ }
+
+ });
+ return Optional.of(groupConfigTable);
}
}
diff --git a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
index b938c5d..21d7f6e 100644
--- a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
+++ b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigurationTestProvider.java
@@ -18,22 +18,22 @@
package org.apache.skywalking.oap.server.configuration.etcd;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
+import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class EtcdConfigurationTestProvider extends ModuleProvider {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(EtcdConfigurationTestProvider.class);
-
ConfigChangeWatcher watcher;
+ GroupConfigChangeWatcher groupWatcher;
@Override
public String name() {
@@ -52,13 +52,13 @@ public class EtcdConfigurationTestProvider extends ModuleProvider {
}
@Override
- public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ public void prepare() throws ServiceNotProvidedException {
watcher = new ConfigChangeWatcher(EtcdConfigurationTestModule.NAME, this, "testKey") {
private volatile String testValue;
@Override
public void notify(ConfigChangeEvent value) {
- LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
+ log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
@@ -71,18 +71,44 @@ public class EtcdConfigurationTestProvider extends ModuleProvider {
return testValue;
}
};
+
+ groupWatcher = new GroupConfigChangeWatcher(EtcdConfigurationTestModule.NAME, this, "testKeyGroup") {
+ private Map<String, String> config = new ConcurrentHashMap<>();
+
+ @Override
+ public void notifyGroup(Map<String, ConfigChangeEvent> groupItems) {
+ log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems);
+ groupItems.forEach((groupItemName, event) -> {
+ if (EventType.DELETE.equals(event.getEventType())) {
+ config.remove(groupItemName);
+ } else {
+ config.put(groupItemName, event.getNewValue());
+ }
+ });
+ }
+
+ @Override
+ public Map<String, String> groupItems() {
+ return config;
+ }
+ };
}
@Override
- public void start() throws ServiceNotProvidedException, ModuleStartException {
+ public void start() throws ServiceNotProvidedException {
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
+
+ getManager().find(ConfigurationModule.NAME)
+ .provider()
+ .getService(DynamicConfigurationService.class)
+ .registerConfigChangeWatcher(groupWatcher);
}
@Override
- public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+ public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
diff --git a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
index acbc77a..a4ddbe4 100644
--- a/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
+++ b/oap-server/server-configuration/configuration-etcd/src/test/java/org/apache/skywalking/oap/server/configuration/etcd/ITEtcdConfigurationTest.java
@@ -108,6 +108,56 @@ public class ITEtcdConfigurationTest {
assertNull(provider.watcher.value());
}
+ @Test(timeout = 20000)
+ public void shouldReadUpdated4Group() throws Exception {
+ assertEquals("{}", provider.groupWatcher.groupItems().toString());
+
+ KV client = Client.builder()
+ .endpoints("http://localhost:" + container.getMappedPort(2379))
+ .namespace(ByteSequence.from("/skywalking/", Charset.defaultCharset()))
+ .build()
+ .getKVClient();
+
+ client.put(
+ ByteSequence.from("test-module.default.testKeyGroup/item1", Charset.defaultCharset()),
+ ByteSequence.from("100", Charset.defaultCharset())
+ ).get();
+ client.put(
+ ByteSequence.from("test-module.default.testKeyGroup/item2", Charset.defaultCharset()),
+ ByteSequence.from("200", Charset.defaultCharset())
+ ).get();
+
+ for (String v = provider.groupWatcher.groupItems().get("item1"); v == null; v = provider.groupWatcher.groupItems().get("item1")) {
+ log.info("value is : {}", provider.groupWatcher.groupItems().get("item1"));
+ TimeUnit.MILLISECONDS.sleep(200L);
+ }
+ for (String v = provider.groupWatcher.groupItems().get("item2"); v == null; v = provider.groupWatcher.groupItems().get("item2")) {
+ log.info("value is : {}", provider.groupWatcher.groupItems().get("item2"));
+ TimeUnit.MILLISECONDS.sleep(200L);
+ }
+ assertEquals("100", provider.groupWatcher.groupItems().get("item1"));
+ assertEquals("200", provider.groupWatcher.groupItems().get("item2"));
+
+ //test remove item1
+ client.delete(ByteSequence.from("test-module.default.testKeyGroup/item1", Charset.defaultCharset())).get();
+ for (String v = provider.groupWatcher.groupItems().get("item1"); v != null; v = provider.groupWatcher.groupItems().get("item1")) {
+ log.info("value is : {}", provider.groupWatcher.groupItems().get("item1"));
+ TimeUnit.MILLISECONDS.sleep(200L);
+ }
+ assertNull(provider.groupWatcher.groupItems().get("item1"));
+
+ //test modify item2
+ client.put(
+ ByteSequence.from("test-module.default.testKeyGroup/item2", Charset.defaultCharset()),
+ ByteSequence.from("300", Charset.defaultCharset())
+ ).get();
+ for (String v = provider.groupWatcher.groupItems().get("item2"); v.equals("200"); v = provider.groupWatcher.groupItems().get("item2")) {
+ log.info("value is : {}", provider.groupWatcher.groupItems().get("item2"));
+ TimeUnit.MILLISECONDS.sleep(200L);
+ }
+ assertEquals("300", provider.groupWatcher.groupItems().get("item2"));
+ }
+
@SuppressWarnings("unchecked")
private static void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
final Yaml yaml = new Yaml();
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 17b69d8..e71db21 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -432,7 +432,7 @@ configuration:
maxRetries: ${SW_CONFIG_ZK_MAX_RETRIES:3} # max number of times to retry
etcd:
period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default fetch every 60 seconds.
- endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
+ endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:http://localhost:2379}
namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
user: ${SW_CONFIG_ETCD_USER:}