You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/09/09 09:08:32 UTC

[incubator-dubbo] branch dev-metadata updated (a5695af -> 507f5bf)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a change to branch dev-metadata
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git.


    from a5695af  Fix problem when rule need to run at runtime.
     new ca057cb  Refactor dynamic config, 1. Extract common method to AbstractDynamicConfiguration 2. Unify strategy when config server cannot be reached at startup: start using local snapshot and try to connect in background.
     new 507f5bf  Update config connecting status when ZK connection changes.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dynamic/AbstractDynamicConfiguration.java      | 35 ++++++++++++++-----
 .../dubbo/config/dynamic/DynamicConfiguration.java |  4 +--
 .../support/apollo/ApolloDynamicConfiguration.java | 34 ++++++++++--------
 .../archaius/ArchaiusDynamicConfiguration.java     | 40 +++++++++-------------
 .../sources/ZooKeeperConfigurationSource.java      | 27 +++++++++++----
 5 files changed, 87 insertions(+), 53 deletions(-)


[incubator-dubbo] 01/02: Refactor dynamic config, 1. Extract common method to AbstractDynamicConfiguration 2. Unify strategy when config server cannot be reached at startup: start using local snapshot and try to connect in background.

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch dev-metadata
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git

commit ca057cb003bd6ee89bf4022bbf678463dd8c6419
Author: ken.lj <ke...@gmail.com>
AuthorDate: Sun Sep 9 17:04:26 2018 +0800

    Refactor dynamic config,
    1. Extract common method to AbstractDynamicConfiguration
    2. Unify strategy when config server cannot be reached at startup: start using local snapshot and try to connect in background.
---
 .../dynamic/AbstractDynamicConfiguration.java      | 35 ++++++++++++++-----
 .../dubbo/config/dynamic/DynamicConfiguration.java |  4 +--
 .../support/apollo/ApolloDynamicConfiguration.java | 34 ++++++++++--------
 .../archaius/ArchaiusDynamicConfiguration.java     | 40 +++++++++-------------
 .../sources/ZooKeeperConfigurationSource.java      | 26 ++++++++++----
 5 files changed, 85 insertions(+), 54 deletions(-)

diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
index 84129d6..e459bcc 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
@@ -18,36 +18,51 @@ package org.apache.dubbo.config.dynamic;
 
 import org.apache.dubbo.common.URL;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 /**
  *
  */
-public abstract class AbstractDynamicConfiguration implements DynamicConfiguration {
+public abstract class AbstractDynamicConfiguration<TargetConfigListener> implements DynamicConfiguration {
     protected URL url;
+    /**
+     * One key can register multiple target listeners, but one target listener only maps to one configuration listener
+     */
+    private ConcurrentMap<String, ConcurrentMap<ConfigurationListener, TargetConfigListener>> listenerToTargetListenerMap;
 
     public AbstractDynamicConfiguration() {
     }
 
     @Override
     public void addListener(String key, ConfigurationListener listener) {
-
+        ConcurrentMap<ConfigurationListener, TargetConfigListener> listeners = listenerToTargetListenerMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
+        TargetConfigListener targetListener = listeners.computeIfAbsent(listener, k -> createTargetConfigListener(key, listener));
+        addTargetListener(key, targetListener);
     }
 
     @Override
     public String getConfig(String key, String group) {
-        return null;
+        return getConfig(key, group, null);
     }
 
     @Override
     public String getConfig(String key, String group, ConfigurationListener listener) {
-        return null;
+        return getConfig(key, group, 0l, listener);
     }
 
     @Override
-    public String getConfig(String key, String group, long timeout) {
-        return null;
+    public String getConfig(String key, String group, long timeout, ConfigurationListener listener) {
+        try {
+            if (listener != null) {
+                this.addListener(key, listener);
+            }
+            return getInternalProperty(key, group, timeout, listener);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
     }
 
-
     public URL getUrl() {
         return url;
     }
@@ -56,6 +71,10 @@ public abstract class AbstractDynamicConfiguration implements DynamicConfigurati
         this.url = url;
     }
 
-    protected abstract String getInternalProperty(String key, String group, long timeout);
+    protected abstract String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener);
+
+    protected abstract void addTargetListener(String key, TargetConfigListener listener);
+
+    protected abstract TargetConfigListener createTargetConfigListener(String key, ConfigurationListener listener);
 
 }
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java
index 8d05aca..799f670 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java
@@ -25,7 +25,7 @@ import org.apache.dubbo.common.extension.SPI;
 @SPI("zookeeper")
 public interface DynamicConfiguration {
 
-    public void init();
+    void init();
 
     URL getUrl();
 
@@ -35,7 +35,7 @@ public interface DynamicConfiguration {
 
     String getConfig(String key, String group);
 
-    String getConfig(String key, String group, long timeout);
+    String getConfig(String key, String group, long timeout, ConfigurationListener listener);
 
     String getConfig(String key, String group, ConfigurationListener listener);
 
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
index 7b718f4..489bf47 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
@@ -36,10 +36,11 @@ import java.util.Set;
 /**
  *
  */
-public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration {
+public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<ConfigChangeListener> {
     private static final String APOLLO_ENV_KEY = "env";
     private static final String APOLLO_ADDR_KEY = "apollo.meta";
     private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";
+    private static final String APPLO_DEFAULT_NAMESPACE = "dubbo";
     /**
      * support two namespaces: application -> dubbo
      */
@@ -70,7 +71,7 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration {
             System.setProperty(APOLLO_CLUSTER_KEY, configCluster);
         }
 
-        dubboConfig = ConfigService.getConfig("dubbo");
+        dubboConfig = ConfigService.getConfig(url.getParameter(Constants.CONFIG_NAMESPACE_KEY, APPLO_DEFAULT_NAMESPACE));
         appConfig = ConfigService.getAppConfig();
     }
 
@@ -83,16 +84,8 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration {
     }
 
     @Override
-    public String getConfig(String key, String group, ConfigurationListener listener) {
-        Set<String> keys = new HashSet<>(1);
-        keys.add(key);
-        this.appConfig.addChangeListener(new ApolloListener(listener), keys);
-        this.dubboConfig.addChangeListener(new ApolloListener(listener), keys);
-        return getInternalProperty(key, group, 0L);
-    }
-
-    @Override
-    protected String getInternalProperty(String key, String group, long timeout) {
+    protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) {
+        // FIXME According to Apollo, if it fails to get a value from one namespace, it will keep logging warning msg. They are working to improve it.
         String value = appConfig.getProperty(key, null);
         if (value == null) {
             value = dubboConfig.getProperty(key, null);
@@ -101,6 +94,19 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration {
         return value;
     }
 
+    @Override
+    protected void addTargetListener(String key, ConfigChangeListener listener) {
+        Set<String> keys = new HashSet<>(1);
+        keys.add(key);
+        this.appConfig.addChangeListener(listener, keys);
+        this.dubboConfig.addChangeListener(listener, keys);
+    }
+
+    @Override
+    protected ConfigChangeListener createTargetConfigListener(String key, ConfigurationListener listener) {
+        return new ApolloListener(listener);
+    }
+
     public ConfigChangeType getChangeType(PropertyChangeType changeType) {
         if (changeType.equals(PropertyChangeType.DELETED)) {
             return ConfigChangeType.DELETED;
@@ -121,12 +127,12 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration {
             this.listener = listener;
         }
 
-        // FIXME will Apollo consider an empty value ("") as deleted?
+        // FIXME will Apollo consider an empty value "" as deleted?
         @Override
         public void onChange(ConfigChangeEvent changeEvent) {
             for (String key : changeEvent.changedKeys()) {
                 ConfigChange change = changeEvent.getChange(key);
-                // Maybe we no longer need to identify the type of change. Because there's no scenario that a callback will subscribe for both configurators and routers
+                // TODO Maybe we no longer need to identify the type of change. Because there's no scenario that a callback will subscribe for both configurators and routers
                 if (change.getPropertyName().endsWith(Constants.CONFIGURATORS_SUFFIX)) {
                     listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.CONFIGURATORS, getChangeType(change.getChangeType())));
                 } else {
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
index 979e884..feb177a 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
@@ -33,7 +33,7 @@ import org.apache.dubbo.config.dynamic.support.archaius.sources.ZooKeeperConfigu
 /**
  * Archaius supports various sources and it's extensiable: JDBC, ZK, Properties, ..., so should we make it extensiable?
  */
-public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration {
+public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration<Runnable> {
 
     public ArchaiusDynamicConfiguration() {
     }
@@ -43,51 +43,45 @@ public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration {
         //  String address = env.getCompositeConf().getString(ADDRESS_KEY);
         //  String app = env.getCompositeConf().getString(APP_KEY);
 
-        String address = url.getAddress();
+        String address = url.getParameter(Constants.CONFIG_ADDRESS_KEY, url.getAddress());
         if (!address.equals(Constants.ANYHOST_VALUE)) {
             System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_SOURCE_ADDRESS_KEY, address);
         }
-        System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_ROOT_PATH_KEY, ZooKeeperConfigurationSource.DEFAULT_CONFIG_ROOT_PATH);
+        System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_ROOT_PATH_KEY, url.getParameter(Constants.CONFIG_NAMESPACE_KEY, ZooKeeperConfigurationSource.DEFAULT_CONFIG_ROOT_PATH));
 
         try {
             ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource();
             zkConfigSource.start();
+            /*if (!zkConfigSource.isConnected()) {
+                // we can check the status of config center here, and decide to fail or continue if we cannot reach the config server.
+            }*/
             DynamicWatchedConfiguration zkDynamicConfig = new DynamicWatchedConfiguration(zkConfigSource);
             ConfigurationManager.install(zkDynamicConfig);
         } catch (Exception e) {
-            e.printStackTrace();
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     @Override
-    public void addListener(String key, ConfigurationListener listener) {
-        DynamicStringProperty prop = DynamicPropertyFactory.getInstance()
-                .getStringProperty(key, null);
-        prop.addCallback(new ArchaiusListener(key, listener));
-    }
-
-    @Override
-    public String getConfig(String key, String group) {
-        return getConfig(key, group, null);
+    protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) {
+        return DynamicPropertyFactory.getInstance()
+                .getStringProperty(key, null)
+                .get();
     }
 
     @Override
-    public String getConfig(String key, String group, ConfigurationListener listener) {
+    protected void addTargetListener(String key, Runnable runnable) {
         DynamicStringProperty prop = DynamicPropertyFactory.getInstance()
                 .getStringProperty(key, null);
-        if (listener != null) {
-            prop.addCallback(new ArchaiusListener(key, listener));
-        }
-        return prop.get();
+        prop.addCallback(runnable);
     }
 
     @Override
-    protected String getInternalProperty(String key, String group, long timeout) {
-        return DynamicPropertyFactory.getInstance()
-                .getStringProperty(key, null)
-                .get();
+    protected Runnable createTargetConfigListener(String key, ConfigurationListener listener) {
+        return new ArchaiusListener(key, listener);
     }
 
+
     private class ArchaiusListener implements Runnable {
         private ConfigurationListener listener;
         private URL url;
@@ -103,7 +97,7 @@ public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration {
                 type = ConfigType.CONFIGURATORS;
             } else {
                 /**
-                 * Works for any router rules:
+                 * used for all router rules:
                  * {@link Constants.ROUTERS_SUFFIX}
                  * {@link org.apache.dubbo.rpc.cluster.router.tag.TagRouter.TAGRULE_DATAID}
                  */
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
index c95033b..6e91624 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
@@ -38,7 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -57,6 +56,7 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
 
     private final String configRootPath;
     private final TreeCache treeCache;
+    private boolean connected = false;
 
     private final Charset charset = Charset.forName("UTF-8");
 
@@ -75,10 +75,18 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
         if (connectString == null) {
             throw new IllegalArgumentException("connectString==null, must specify the address to connect for zookeeper archaius source.");
         }
+
         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeout, connectTimeout,
                 new ExponentialBackoffRetry(1000, 3));
         client.start();
-
+        try {
+            connected = client.blockUntilConnected(connectTimeout * 4, TimeUnit.MILLISECONDS);
+            if (!connected) {
+                logger.warn("Cannot connect to ConfigCenter at zookeeper " + connectString + " in " + connectTimeout * 4 + "ms");
+            }
+        } catch (InterruptedException e) {
+            logger.error("The thread was interrupted unexpectedly when try connecting to zookeeper " + connectString + " as ConfigCenter, ", e);
+        }
         this.client = client;
         this.configRootPath = configRootPath;
         this.treeCache = new TreeCache(client, configRootPath);
@@ -103,16 +111,12 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
      */
     public void start() throws Exception {
         // create the watcher for future configuration updatess
-        CountDownLatch latch = new CountDownLatch(1);
         treeCache.getListenable().addListener(new TreeCacheListener() {
             public void childEvent(CuratorFramework aClient, TreeCacheEvent event)
                     throws Exception {
 
                 TreeCacheEvent.Type type = event.getType();
                 ChildData data = event.getData();
-                if (type == TreeCacheEvent.Type.INITIALIZED) {
-                    latch.countDown();
-                }
 
                 // TODO, ignore other event types
                 if (data == null) {
@@ -152,7 +156,6 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
 
         // passing true to trigger an initial rebuild upon starting.  (blocking call)
         treeCache.start();
-        latch.await(60 * 1000, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -174,6 +177,11 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
 
         Map<String, Object> all = new HashMap<>();
 
+        if (!connected) {
+            logger.warn("ConfigServer is not connected yet, zookeeper don't support local snapshot yet, so there's no old data to use!");
+            return all;
+        }
+
         Map<String, ChildData> dataMap = treeCache.getCurrentChildren(configRootPath);
         if (dataMap != null && dataMap.size() > 0) {
             dataMap.forEach((childPath, v) -> {
@@ -220,4 +228,8 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
             logger.error("IOException should not have been thrown.", exc);
         }
     }
+
+    public boolean isConnected() {
+        return connected;
+    }
 }


[incubator-dubbo] 02/02: Update config connecting status when ZK connection changes.

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch dev-metadata
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git

commit 507f5bfc2967442a5b40b2f535c9f17cd5519ef0
Author: ken.lj <ke...@gmail.com>
AuthorDate: Sun Sep 9 17:08:04 2018 +0800

    Update config connecting status when ZK connection changes.
---
 .../dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
index 6e91624..41d6861 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
@@ -117,6 +117,9 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
 
                 TreeCacheEvent.Type type = event.getType();
                 ChildData data = event.getData();
+                if (type == TreeCacheEvent.Type.INITIALIZED || type == TreeCacheEvent.Type.CONNECTION_RECONNECTED) {
+                    connected = true;
+                }
 
                 // TODO, ignore other event types
                 if (data == null) {