You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by qi...@apache.org on 2021/01/14 06:03:37 UTC
[shardingsphere] branch master updated: Refactor
SchemaChangedListener (#9022)
This is an automated email from the ASF dual-hosted git repository.
qiulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 83cebbf Refactor SchemaChangedListener (#9022)
83cebbf is described below
commit 83cebbf04b4ff225ce88f6d205994d5e59660a63
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu Jan 14 14:03:15 2021 +0800
Refactor SchemaChangedListener (#9022)
* Refactor SchemaChangedListener
* Refactor SchemaChangedListener
---
.../governance/core/config/ConfigCenter.java | 12 +++---
.../governance/core/config/ConfigCenterNode.java | 47 ++++++++++++++++++----
.../config/listener/SchemaChangedListener.java | 2 +-
.../core/config/ConfigCenterNodeTest.java | 2 +-
.../governance/core/config/ConfigCenterTest.java | 22 +++++-----
.../config/listener/SchemaChangedListenerTest.java | 6 +--
.../zookeeper/CuratorZookeeperRepository.java | 15 +++++++
.../impl/GovernanceBootstrapInitializerTest.java | 2 +-
8 files changed, 77 insertions(+), 31 deletions(-)
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java
index 6ace568..fa37bc9 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenter.java
@@ -145,14 +145,14 @@ public final class ConfigCenter {
*/
@Subscribe
public synchronized void renew(final SchemaNamePersistEvent event) {
- String schemaNames = repository.get(node.getSchemasNodePath());
+ String schemaNames = repository.get(node.getMetadataNodePath());
Collection<String> schemas = Strings.isNullOrEmpty(schemaNames) ? new LinkedHashSet<>() : new LinkedHashSet<>(Splitter.on(",").splitToList(schemaNames));
if (event.isDrop()) {
schemas.remove(event.getSchemaName());
} else if (!schemas.contains(event.getSchemaName())) {
schemas.add(event.getSchemaName());
}
- repository.persist(node.getSchemasNodePath(), Joiner.on(",").join(schemas));
+ repository.persist(node.getMetadataNodePath(), Joiner.on(",").join(schemas));
}
/**
@@ -269,9 +269,9 @@ public final class ConfigCenter {
}
private void persistSchemaName(final String schemaName) {
- String schemaNames = repository.get(node.getSchemasNodePath());
+ String schemaNames = repository.get(node.getMetadataNodePath());
if (Strings.isNullOrEmpty(schemaNames)) {
- repository.persist(node.getSchemasNodePath(), schemaName);
+ repository.persist(node.getMetadataNodePath(), schemaName);
return;
}
List<String> schemaNameList = Splitter.on(",").splitToList(schemaNames);
@@ -280,7 +280,7 @@ public final class ConfigCenter {
}
List<String> newArrayList = new ArrayList<>(schemaNameList);
newArrayList.add(schemaName);
- repository.persist(node.getSchemasNodePath(), Joiner.on(",").join(newArrayList));
+ repository.persist(node.getMetadataNodePath(), Joiner.on(",").join(newArrayList));
}
/**
@@ -335,7 +335,7 @@ public final class ConfigCenter {
* @return all schema names
*/
public Collection<String> getAllSchemaNames() {
- String schemaNames = repository.get(node.getSchemasNodePath());
+ String schemaNames = repository.get(node.getMetadataNodePath());
return Strings.isNullOrEmpty(schemaNames) ? new LinkedList<>() : node.splitSchemaName(schemaNames);
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java
index dc94fda..3426274 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNode.java
@@ -36,8 +36,6 @@ public final class ConfigCenterNode {
private static final String METADATA_NODE = "metadata";
- private static final String SCHEMAS_NODE = "schemas";
-
private static final String DATA_SOURCE_NODE = "datasource";
private static final String RULE_NODE = "rule";
@@ -158,7 +156,7 @@ public final class ConfigCenterNode {
* @return config paths list.
*/
public Collection<String> getAllSchemaConfigPaths(final Collection<String> schemaNames) {
- Collection<String> result = new ArrayList<>(Collections.singleton(getSchemasNodePath()));
+ Collection<String> result = new ArrayList<>(Collections.singleton(getMetadataNodePath()));
for (String schemaName : schemaNames) {
result.add(getRulePath(schemaName));
result.add(getDataSourcePath(schemaName));
@@ -178,11 +176,44 @@ public final class ConfigCenterNode {
}
/**
- * Get schemas node path.
- *
- * @return schemas node path.
+ * Get all schema paths.
+ *
+ * @param schemaNames schema names.
+ * @return list of schema path.
+ */
+ public Collection<String> getAllSchemaPaths(final Collection<String> schemaNames) {
+ Collection<String> result = Collections.emptyList();
+ for (String schemaName : schemaNames) {
+ result.add(getSchemaPath(schemaName));
+ }
+ return result;
+ }
+
+ /**
+ * Get all rule paths.
+ *
+ * @param schemaNames schema names.
+ * @return list of rule path.
*/
- public String getSchemasNodePath() {
- return Joiner.on(PATH_SEPARATOR).join("", SCHEMAS_NODE);
+ public Collection<String> getAllRulePaths(final Collection<String> schemaNames) {
+ Collection<String> result = Collections.emptyList();
+ for (String schemaName : schemaNames) {
+ result.add(getRulePath(schemaName));
+ }
+ return result;
+ }
+
+ /**
+ * Get all data source paths.
+ *
+ * @param schemaNames schema names.
+ * @return list of data source path.
+ */
+ public Collection<String> getAllDataSourcePaths(final Collection<String> schemaNames) {
+ Collection<String> result = Collections.emptyList();
+ for (String schemaName : schemaNames) {
+ result.add(getDataSourcePath(schemaName));
+ }
+ return result;
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java
index f24045f..c180d3e 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListener.java
@@ -69,7 +69,7 @@ public final class SchemaChangedListener extends PostGovernanceRepositoryEventLi
@Override
protected Optional<GovernanceEvent> createEvent(final DataChangedEvent event) {
// TODO Consider removing the following one.
- if (configurationNode.getSchemasNodePath().equals(event.getKey())) {
+ if (configurationNode.getMetadataNodePath().equals(event.getKey())) {
return createSchemaNamesUpdatedEvent(event.getValue());
}
String schemaName = configurationNode.getSchemaName(event.getKey());
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java
index 39e42c3..5fdb11a 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterNodeTest.java
@@ -60,7 +60,7 @@ public final class ConfigCenterNodeTest {
public void assertGetAllSchemaConfigPaths() {
Collection<String> actual = configurationNode.getAllSchemaConfigPaths(Collections.singletonList(DefaultSchema.LOGIC_NAME));
assertThat(actual.size(), is(4));
- assertThat(actual, hasItems("/schemas"));
+ assertThat(actual, hasItems("/metadata"));
assertThat(actual, hasItems("/metadata/logic_db/rule"));
assertThat(actual, hasItems("/metadata/logic_db/datasource"));
assertThat(actual, hasItems("/metadata/logic_db/schema"));
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java
index f93b736..229ab18 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/ConfigCenterTest.java
@@ -438,7 +438,7 @@ public final class ConfigCenterTest {
@Test
public void assertGetAllSchemaNames() {
- when(configurationRepository.get("/schemas")).thenReturn("sharding_db,replica_query_db");
+ when(configurationRepository.get("/metadata")).thenReturn("sharding_db,replica_query_db");
ConfigCenter configCenter = new ConfigCenter(configurationRepository);
Collection<String> actual = configCenter.getAllSchemaNames();
assertThat(actual.size(), is(2));
@@ -483,17 +483,17 @@ public final class ConfigCenterTest {
@Test
public void assertPersistSchemaNameWithExistSchema() {
ConfigCenter configCenter = new ConfigCenter(configurationRepository);
- when(configurationRepository.get("/schemas")).thenReturn("sharding_db");
+ when(configurationRepository.get("/metadata")).thenReturn("sharding_db");
configCenter.persistConfigurations("sharding_db", createDataSourceConfigurations(), createRuleConfigurations(), true);
- verify(configurationRepository, times(0)).persist(eq("/schemas"), eq("sharding_db"));
+ verify(configurationRepository, times(0)).persist(eq("/metadata"), eq("sharding_db"));
}
@Test
public void assertPersistSchemaNameWithExistAndNewSchema() {
ConfigCenter configCenter = new ConfigCenter(configurationRepository);
- when(configurationRepository.get("/schemas")).thenReturn("replica_query_db");
+ when(configurationRepository.get("/metadata")).thenReturn("replica_query_db");
configCenter.persistConfigurations("sharding_db", createDataSourceConfigurations(), createRuleConfigurations(), true);
- verify(configurationRepository).persist(eq("/schemas"), eq("replica_query_db,sharding_db"));
+ verify(configurationRepository).persist(eq("/metadata"), eq("replica_query_db,sharding_db"));
}
@Test
@@ -515,28 +515,28 @@ public final class ConfigCenterTest {
@Test
public void assertRenewSchemaNameEventWithDrop() {
SchemaNamePersistEvent event = new SchemaNamePersistEvent("sharding_db", true);
- when(configurationRepository.get("/schemas")).thenReturn("sharding_db,replica_query_db");
+ when(configurationRepository.get("/metadata")).thenReturn("sharding_db,replica_query_db");
ConfigCenter configCenter = new ConfigCenter(configurationRepository);
configCenter.renew(event);
- verify(configurationRepository).persist(eq("/schemas"), eq("replica_query_db"));
+ verify(configurationRepository).persist(eq("/metadata"), eq("replica_query_db"));
}
@Test
public void assertRenewSchemaNameEventWithAdd() {
SchemaNamePersistEvent event = new SchemaNamePersistEvent("sharding_db", false);
- when(configurationRepository.get("/schemas")).thenReturn("replica_query_db");
+ when(configurationRepository.get("/metadata")).thenReturn("replica_query_db");
ConfigCenter configCenter = new ConfigCenter(configurationRepository);
configCenter.renew(event);
- verify(configurationRepository).persist(eq("/schemas"), eq("replica_query_db,sharding_db"));
+ verify(configurationRepository).persist(eq("/metadata"), eq("replica_query_db,sharding_db"));
}
@Test
public void assertRenewSchemaNameEventWithAddAndExist() {
SchemaNamePersistEvent event = new SchemaNamePersistEvent("sharding_db", false);
- when(configurationRepository.get("/schemas")).thenReturn("sharding_db,replica_query_db");
+ when(configurationRepository.get("/metadata")).thenReturn("sharding_db,replica_query_db");
ConfigCenter configCenter = new ConfigCenter(configurationRepository);
configCenter.renew(event);
- verify(configurationRepository).persist(eq("/schemas"), eq("sharding_db,replica_query_db"));
+ verify(configurationRepository).persist(eq("/metadata"), eq("sharding_db,replica_query_db"));
}
@Test
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java
index 0acf8d9..b023e9a 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/config/listener/SchemaChangedListenerTest.java
@@ -224,7 +224,7 @@ public final class SchemaChangedListenerTest {
@Test
public void assertCreateSchemaNamesUpdatedEventForAdd() {
- DataChangedEvent dataChangedEvent = new DataChangedEvent("/schemas", "sharding_db,replica_query_db,encrypt_db,shadow_db", Type.UPDATED);
+ DataChangedEvent dataChangedEvent = new DataChangedEvent("/metadata", "sharding_db,replica_query_db,encrypt_db,shadow_db", Type.UPDATED);
Optional<GovernanceEvent> actual = schemaChangedListener.createEvent(dataChangedEvent);
assertTrue(actual.isPresent());
assertThat(((MetaDataAddedEvent) actual.get()).getSchemaName(), is("shadow_db"));
@@ -232,7 +232,7 @@ public final class SchemaChangedListenerTest {
@Test
public void assertCreateSchemaNamesUpdatedEventForDelete() {
- DataChangedEvent dataChangedEvent = new DataChangedEvent("/schemas", "sharding_db,replica_query_db", Type.UPDATED);
+ DataChangedEvent dataChangedEvent = new DataChangedEvent("/metadata", "sharding_db,replica_query_db", Type.UPDATED);
Optional<GovernanceEvent> actual = schemaChangedListener.createEvent(dataChangedEvent);
assertTrue(actual.isPresent());
assertThat(((MetaDataDeletedEvent) actual.get()).getSchemaName(), is("encrypt_db"));
@@ -240,7 +240,7 @@ public final class SchemaChangedListenerTest {
@Test
public void assertCreateSchemaNamesUpdatedEventForIgnore() {
- DataChangedEvent dataChangedEvent = new DataChangedEvent("/schemas", "sharding_db,replica_query_db,encrypt_db", Type.UPDATED);
+ DataChangedEvent dataChangedEvent = new DataChangedEvent("/metadata", "sharding_db,replica_query_db,encrypt_db", Type.UPDATED);
assertFalse(schemaChangedListener.createEvent(dataChangedEvent).isPresent());
}
diff --git a/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/governance/repository/zookeeper/CuratorZookeeperRepository.java b/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/govern [...]
index 8dd6414..deb93ff 100644
--- a/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/governance/repository/zookeeper/CuratorZookeeperRepository.java
+++ b/shardingsphere-governance/shardingsphere-governance-repository/shardingsphere-governance-repository-provider/shardingsphere-governance-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/governance/repository/zookeeper/CuratorZookeeperRepository.java
@@ -47,11 +47,13 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -61,6 +63,8 @@ public final class CuratorZookeeperRepository implements ConfigurationRepository
private final Map<String, CuratorCache> caches = new HashMap<>();
+ private final Set<String> watchedKeys = new HashSet<>();
+
private CuratorFramework client;
private InterProcessLock lock;
@@ -233,6 +237,9 @@ public final class CuratorZookeeperRepository implements ConfigurationRepository
@Override
public void watch(final String key, final DataChangedEventListener listener) {
String path = key + PATH_SEPARATOR;
+ if (isDuplicate(path)) {
+ return;
+ }
if (!caches.containsKey(path)) {
addCacheData(key);
}
@@ -247,6 +254,14 @@ public final class CuratorZookeeperRepository implements ConfigurationRepository
});
}
+ private boolean isDuplicate(final String key) {
+ if (!watchedKeys.isEmpty()) {
+ return watchedKeys.stream().filter(each -> key.startsWith(each)).findFirst().isPresent();
+ }
+ watchedKeys.add(key);
+ return false;
+ }
+
private void addCacheData(final String cachePath) {
CuratorCache cache = CuratorCache.build(client, cachePath);
try {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java
index 634c36b..5f50481 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializerTest.java
@@ -87,7 +87,7 @@ public final class GovernanceBootstrapInitializerTest extends AbstractBootstrapI
ConfigCenterNode node = new ConfigCenterNode();
configurationRepository.persist(node.getAuthenticationPath(), readYAML(AUTHENTICATION_YAML));
configurationRepository.persist(node.getPropsPath(), readYAML(PROPS_YAML));
- configurationRepository.persist(node.getSchemasNodePath(), "db");
+ configurationRepository.persist(node.getMetadataNodePath(), "db");
configurationRepository.persist(node.getDataSourcePath("db"), readYAML(DATA_SOURCE_YAML));
configurationRepository.persist(node.getRulePath("db"), readYAML(SHARDING_RULE_YAML));
}