You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by qi...@apache.org on 2021/01/14 06:03:37 UTC

[shardingsphere] branch master updated: Refactor SchemaChangedListener (#9022)

This is an automated email from the ASF dual-hosted git repository.

qiulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 83cebbf  Refactor SchemaChangedListener (#9022)
83cebbf is described below

commit 83cebbf04b4ff225ce88f6d205994d5e59660a63
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu Jan 14 14:03:15 2021 +0800

    Refactor SchemaChangedListener (#9022)
    
    * Refactor SchemaChangedListener
    
    * Refactor SchemaChangedListener
---
 .../governance/core/config/ConfigCenter.java       | 12 +++---
 .../governance/core/config/ConfigCenterNode.java   | 47 ++++++++++++++++++----
 .../config/listener/SchemaChangedListener.java     |  2 +-
 .../core/config/ConfigCenterNodeTest.java          |  2 +-
 .../governance/core/config/ConfigCenterTest.java   | 22 +++++-----
 .../config/listener/SchemaChangedListenerTest.java |  6 +--
 .../zookeeper/CuratorZookeeperRepository.java      | 15 +++++++
 .../impl/GovernanceBootstrapInitializerTest.java   |  2 +-
 8 files changed, 77 insertions(+), 31 deletions(-)

diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java
index 6ace568..fa37bc9 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java
@@ -145,14 +145,14 @@ public final class ConfigCenter {
      */
     @Subscribe
     public synchronized void renew(final SchemaNamePersistEvent event) {
-        String schemaNames = repository.get(node.getSchemasNodePath());
+        String schemaNames = repository.get(node.getMetadataNodePath());
         Collection<String> schemas = Strings.isNullOrEmpty(schemaNames) ? new LinkedHashSet<>() : new LinkedHashSet<>(Splitter.on(",").splitToList(schemaNames));
         if (event.isDrop()) {
             schemas.remove(event.getSchemaName());
         } else if (!schemas.contains(event.getSchemaName())) {
             schemas.add(event.getSchemaName());
         }
-        repository.persist(node.getSchemasNodePath(), Joiner.on(",").join(schemas));
+        repository.persist(node.getMetadataNodePath(), Joiner.on(",").join(schemas));
     }
     
     /**
@@ -269,9 +269,9 @@ public final class ConfigCenter {
     }
     
     private void persistSchemaName(final String schemaName) {
-        String schemaNames = repository.get(node.getSchemasNodePath());
+        String schemaNames = repository.get(node.getMetadataNodePath());
         if (Strings.isNullOrEmpty(schemaNames)) {
-            repository.persist(node.getSchemasNodePath(), schemaName);
+            repository.persist(node.getMetadataNodePath(), schemaName);
             return;
         }
         List<String> schemaNameList = Splitter.on(",").splitToList(schemaNames);
@@ -280,7 +280,7 @@ public final class ConfigCenter {
         }
         List<String> newArrayList = new ArrayList<>(schemaNameList);
         newArrayList.add(schemaName);
-        repository.persist(node.getSchemasNodePath(), Joiner.on(",").join(newArrayList));
+        repository.persist(node.getMetadataNodePath(), Joiner.on(",").join(newArrayList));
     }
     
     /**
@@ -335,7 +335,7 @@ public final class ConfigCenter {
      * @return all schema names
      */
     public Collection<String> getAllSchemaNames() {
-        String schemaNames = repository.get(node.getSchemasNodePath());
+        String schemaNames = repository.get(node.getMetadataNodePath());
         return Strings.isNullOrEmpty(schemaNames) ? new LinkedList<>() : node.splitSchemaName(schemaNames);
     }
     
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java
index dc94fda..3426274 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java
@@ -36,8 +36,6 @@ public final class ConfigCenterNode {
     
     private static final String METADATA_NODE = "metadata";
     
-    private static final String SCHEMAS_NODE = "schemas";
-    
     private static final String DATA_SOURCE_NODE = "datasource";
     
     private static final String RULE_NODE = "rule";
@@ -158,7 +156,7 @@ public final class ConfigCenterNode {
      * @return config paths list.
      */
     public Collection<String> getAllSchemaConfigPaths(final Collection<String> schemaNames) {
-        Collection<String> result = new ArrayList<>(Collections.singleton(getSchemasNodePath()));
+        Collection<String> result = new ArrayList<>(Collections.singleton(getMetadataNodePath()));
         for (String schemaName : schemaNames) {
             result.add(getRulePath(schemaName));
             result.add(getDataSourcePath(schemaName));
@@ -178,11 +176,44 @@ public final class ConfigCenterNode {
     }
     
     /**
-     * Get schemas node path.
-     * 
-     * @return schemas node path.
+     * Get all schema paths.
+     *
+     * @param schemaNames schema names.
+     * @return list of schema path.
+     */
+    public Collection<String> getAllSchemaPaths(final Collection<String> schemaNames) {
+        Collection<String> result = Collections.emptyList();
+        for (String schemaName : schemaNames) {
+            result.add(getSchemaPath(schemaName));
+        }
+        return result;
+    }
+    
+    /**
+     * Get all rule paths.
+     *
+     * @param schemaNames schema names.
+     * @return list of rule path.
      */
-    public String getSchemasNodePath() {
-        return Joiner.on(PATH_SEPARATOR).join("", SCHEMAS_NODE);
+    public Collection<String> getAllRulePaths(final Collection<String> schemaNames) {
+        Collection<String> result = Collections.emptyList();
+        for (String schemaName : schemaNames) {
+            result.add(getRulePath(schemaName));
+        }
+        return result;
+    }
+    
+    /**
+     * Get all data source paths.
+     *
+     * @param schemaNames schema names.
+     * @return list of data source path.
+     */
+    public Collection<String> getAllDataSourcePaths(final Collection<String> schemaNames) {
+        Collection<String> result = Collections.emptyList();
+        for (String schemaName : schemaNames) {
+            result.add(getDataSourcePath(schemaName));
+        }
+        return result;
     }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java
index f24045f..c180d3e 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java
@@ -69,7 +69,7 @@ public final class SchemaChangedListener extends PostGovernanceRepositoryEventLi
     @Override
     protected Optional<GovernanceEvent> createEvent(final DataChangedEvent event) {
         // TODO Consider removing the following one.
-        if (configurationNode.getSchemasNodePath().equals(event.getKey())) {
+        if (configurationNode.getMetadataNodePath().equals(event.getKey())) {
             return createSchemaNamesUpdatedEvent(event.getValue());
         }
         String schemaName = configurationNode.getSchemaName(event.getKey());
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java
index 39e42c3..5fdb11a 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java
@@ -60,7 +60,7 @@ public final class ConfigCenterNodeTest {
     public void assertGetAllSchemaConfigPaths() {
         Collection<String> actual = configurationNode.getAllSchemaConfigPaths(Collections.singletonList(DefaultSchema.LOGIC_NAME));
         assertThat(actual.size(), is(4));
-        assertThat(actual, hasItems("/schemas"));
+        assertThat(actual, hasItems("/metadata"));
         assertThat(actual, hasItems("/metadata/logic_db/rule"));
         assertThat(actual, hasItems("/metadata/logic_db/datasource"));
         assertThat(actual, hasItems("/metadata/logic_db/schema"));
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java
index f93b736..229ab18 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java
@@ -438,7 +438,7 @@ public final class ConfigCenterTest {
     
     @Test
     public void assertGetAllSchemaNames() {
-        when(configurationRepository.get("/schemas")).thenReturn("sharding_db,replica_query_db");
+        when(configurationRepository.get("/metadata")).thenReturn("sharding_db,replica_query_db");
         ConfigCenter configCenter = new ConfigCenter(configurationRepository);
         Collection<String> actual = configCenter.getAllSchemaNames();
         assertThat(actual.size(), is(2));
@@ -483,17 +483,17 @@ public final class ConfigCenterTest {
     @Test
     public void assertPersistSchemaNameWithExistSchema() {
         ConfigCenter configCenter = new ConfigCenter(configurationRepository);
-        when(configurationRepository.get("/schemas")).thenReturn("sharding_db");
+        when(configurationRepository.get("/metadata")).thenReturn("sharding_db");
         configCenter.persistConfigurations("sharding_db", createDataSourceConfigurations(), createRuleConfigurations(), true);
-        verify(configurationRepository, times(0)).persist(eq("/schemas"), eq("sharding_db"));
+        verify(configurationRepository, times(0)).persist(eq("/metadata"), eq("sharding_db"));
     }
     
     @Test
     public void assertPersistSchemaNameWithExistAndNewSchema() {
         ConfigCenter configCenter = new ConfigCenter(configurationRepository);
-        when(configurationRepository.get("/schemas")).thenReturn("replica_query_db");
+        when(configurationRepository.get("/metadata")).thenReturn("replica_query_db");
         configCenter.persistConfigurations("sharding_db", createDataSourceConfigurations(), createRuleConfigurations(), true);
-        verify(configurationRepository).persist(eq("/schemas"), eq("replica_query_db,sharding_db"));
+        verify(configurationRepository).persist(eq("/metadata"), eq("replica_query_db,sharding_db"));
     }
     
     @Test
@@ -515,28 +515,28 @@ public final class ConfigCenterTest {
     @Test
     public void assertRenewSchemaNameEventWithDrop() {
         SchemaNamePersistEvent event = new SchemaNamePersistEvent("sharding_db", true);
-        when(configurationRepository.get("/schemas")).thenReturn("sharding_db,replica_query_db");
+        when(configurationRepository.get("/metadata")).thenReturn("sharding_db,replica_query_db");
         ConfigCenter configCenter = new ConfigCenter(configurationRepository);
         configCenter.renew(event);
-        verify(configurationRepository).persist(eq("/schemas"), eq("replica_query_db"));
+        verify(configurationRepository).persist(eq("/metadata"), eq("replica_query_db"));
     }
     
     @Test
     public void assertRenewSchemaNameEventWithAdd() {
         SchemaNamePersistEvent event = new SchemaNamePersistEvent("sharding_db", false);
-        when(configurationRepository.get("/schemas")).thenReturn("replica_query_db");
+        when(configurationRepository.get("/metadata")).thenReturn("replica_query_db");
         ConfigCenter configCenter = new ConfigCenter(configurationRepository);
         configCenter.renew(event);
-        verify(configurationRepository).persist(eq("/schemas"), eq("replica_query_db,sharding_db"));
+        verify(configurationRepository).persist(eq("/metadata"), eq("replica_query_db,sharding_db"));
     }
     
     @Test
     public void assertRenewSchemaNameEventWithAddAndExist() {
         SchemaNamePersistEvent event = new SchemaNamePersistEvent("sharding_db", false);
-        when(configurationRepository.get("/schemas")).thenReturn("sharding_db,replica_query_db");
+        when(configurationRepository.get("/metadata")).thenReturn("sharding_db,replica_query_db");
         ConfigCenter configCenter = new ConfigCenter(configurationRepository);
         configCenter.renew(event);
-        verify(configurationRepository).persist(eq("/schemas"), eq("sharding_db,replica_query_db"));
+        verify(configurationRepository).persist(eq("/metadata"), eq("sharding_db,replica_query_db"));
     }
     
     @Test
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java
index 0acf8d9..b023e9a 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java
@@ -224,7 +224,7 @@ public final class SchemaChangedListenerTest {
     
     @Test
     public void assertCreateSchemaNamesUpdatedEventForAdd() {
-        DataChangedEvent dataChangedEvent = new DataChangedEvent("/schemas", "sharding_db,replica_query_db,encrypt_db,shadow_db", Type.UPDATED);
+        DataChangedEvent dataChangedEvent = new DataChangedEvent("/metadata", "sharding_db,replica_query_db,encrypt_db,shadow_db", Type.UPDATED);
         Optional<GovernanceEvent> actual = schemaChangedListener.createEvent(dataChangedEvent);
         assertTrue(actual.isPresent());
         assertThat(((MetaDataAddedEvent) actual.get()).getSchemaName(), is("shadow_db"));
@@ -232,7 +232,7 @@ public final class SchemaChangedListenerTest {
     
     @Test
     public void assertCreateSchemaNamesUpdatedEventForDelete() {
-        DataChangedEvent dataChangedEvent = new DataChangedEvent("/schemas", "sharding_db,replica_query_db", Type.UPDATED);
+        DataChangedEvent dataChangedEvent = new DataChangedEvent("/metadata", "sharding_db,replica_query_db", Type.UPDATED);
         Optional<GovernanceEvent> actual = schemaChangedListener.createEvent(dataChangedEvent);
         assertTrue(actual.isPresent());
         assertThat(((MetaDataDeletedEvent) actual.get()).getSchemaName(), is("encrypt_db"));
@@ -240,7 +240,7 @@ public final class SchemaChangedListenerTest {
     
     @Test
     public void assertCreateSchemaNamesUpdatedEventForIgnore() {
-        DataChangedEvent dataChangedEvent = new DataChangedEvent("/schemas", "sharding_db,replica_query_db,encrypt_db", Type.UPDATED);
+        DataChangedEvent dataChangedEvent = new DataChangedEvent("/metadata", "sharding_db,replica_query_db,encrypt_db", Type.UPDATED);
         assertFalse(schemaChangedListener.createEvent(dataChangedEvent).isPresent());
     }
     
diff --git a/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/governance/repository/zookeeper/CuratorZookeeperRepository.java b/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/govern [...]
index 8dd6414..deb93ff 100644
--- a/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/governance/repository/zookeeper/CuratorZookeeperRepository.java
+++ b/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/governance/repository/zookeeper/CuratorZookeeperRepository.java
@@ -47,11 +47,13 @@ import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -61,6 +63,8 @@ public final class CuratorZookeeperRepository implements ConfigurationRepository
     
     private final Map<String, CuratorCache> caches = new HashMap<>();
     
+    private final Set<String> watchedKeys = new HashSet<>();
+    
     private CuratorFramework client;
     
     private InterProcessLock lock;
@@ -233,6 +237,9 @@ public final class CuratorZookeeperRepository implements ConfigurationRepository
     @Override
     public void watch(final String key, final DataChangedEventListener listener) {
         String path = key + PATH_SEPARATOR;
+        if (isDuplicate(path)) {
+            return;
+        }
         if (!caches.containsKey(path)) {
             addCacheData(key);
         }
@@ -247,6 +254,14 @@ public final class CuratorZookeeperRepository implements ConfigurationRepository
         });
     }
     
+    private boolean isDuplicate(final String key) {
+        if (!watchedKeys.isEmpty()) {
+            return watchedKeys.stream().filter(each -> key.startsWith(each)).findFirst().isPresent();
+        }
+        watchedKeys.add(key);
+        return false;
+    }
+    
     private void addCacheData(final String cachePath) {
         CuratorCache cache = CuratorCache.build(client, cachePath);
         try {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java
index 634c36b..5f50481 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java
@@ -87,7 +87,7 @@ public final class GovernanceBootstrapInitializerTest extends AbstractBootstrapI
         ConfigCenterNode node = new ConfigCenterNode();
         configurationRepository.persist(node.getAuthenticationPath(), readYAML(AUTHENTICATION_YAML));
         configurationRepository.persist(node.getPropsPath(), readYAML(PROPS_YAML));
-        configurationRepository.persist(node.getSchemasNodePath(), "db");
+        configurationRepository.persist(node.getMetadataNodePath(), "db");
         configurationRepository.persist(node.getDataSourcePath("db"), readYAML(DATA_SOURCE_YAML));
         configurationRepository.persist(node.getRulePath("db"), readYAML(SHARDING_RULE_YAML));
     }