You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2021/05/24 14:07:20 UTC

[shardingsphere] branch master updated: Add ScalingRegistryService (#10447)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 70a1145  Add ScalingRegistryService (#10447)
70a1145 is described below

commit 70a114548331ed2c257260b91323584f4b92d490
Author: Liang Zhang <te...@163.com>
AuthorDate: Mon May 24 22:06:38 2021 +0800

    Add ScalingRegistryService (#10447)
    
    * Inline RuleChangedListener
    
    * Add ScalingRegistryService
    
    * Add todo
---
 .../governance/core/registry/RegistryCenter.java   | 38 ----------
 .../listener/impl/RuleChangedListener.java         |  6 +-
 .../service/scaling/ScalingRegistryService.java    | 87 ++++++++++++++++++++++
 .../core/registry/RegistryCenterTest.java          | 18 -----
 .../scaling/ScalingRegistryServiceTest.java        | 81 ++++++++++++++++++++
 5 files changed, 169 insertions(+), 61 deletions(-)

diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 079f3aa..cad9f5c 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -29,9 +29,6 @@ import org.apache.shardingsphere.governance.core.registry.listener.event.invocat
 import org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessUnitReportEvent;
 import org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListRequestEvent;
 import org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.governance.core.registry.listener.event.rule.RuleConfigurationCachedEvent;
-import org.apache.shardingsphere.governance.core.registry.listener.event.rule.SwitchRuleConfigurationEvent;
-import org.apache.shardingsphere.governance.core.registry.listener.event.scaling.StartScalingEvent;
 import org.apache.shardingsphere.governance.core.registry.service.config.impl.DataSourceRegistryService;
 import org.apache.shardingsphere.governance.core.registry.service.config.impl.GlobalRuleRegistryService;
 import org.apache.shardingsphere.governance.core.registry.service.config.impl.PropertiesRegistryService;
@@ -54,7 +51,6 @@ import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUsers;
 import org.apache.shardingsphere.infra.metadata.user.yaml.config.YamlUsersConfigurationConverter;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -77,8 +73,6 @@ public final class RegistryCenter {
     
     private final RegistryCenterNode node;
     
-    private final RegistryCacheManager registryCacheManager;
-    
     @Getter
     private final DataSourceRegistryService dataSourceService;
     
@@ -104,7 +98,6 @@ public final class RegistryCenter {
         instanceId = GovernanceInstance.getInstance().getId();
         this.repository = repository;
         node = new RegistryCenterNode();
-        registryCacheManager = new RegistryCacheManager(repository, node);
         dataSourceService = new DataSourceRegistryService(repository);
         schemaRuleService = new SchemaRuleRegistryService(repository);
         globalRuleService = new GlobalRuleRegistryService(repository);
@@ -152,37 +145,6 @@ public final class RegistryCenter {
         repository.persist(node.getMetadataNodePath(), String.join(",", newSchemaNames));
     }
     
-    @SuppressWarnings("unchecked")
-    private Collection<RuleConfiguration> loadCachedRuleConfigurations(final String schemaName, final String ruleConfigCacheId) {
-        return new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(
-                YamlEngine.unmarshal(registryCacheManager.loadCache(node.getRulePath(schemaName), ruleConfigCacheId), Collection.class));
-    }
-    
-    /**
-     * Switch rule configuration.
-     *
-     * @param event switch rule configuration event
-     */
-    @Subscribe
-    public void renew(final SwitchRuleConfigurationEvent event) {
-        schemaRuleService.persist(event.getSchemaName(), loadCachedRuleConfigurations(event.getSchemaName(), event.getRuleConfigurationCacheId()));
-        registryCacheManager.deleteCache(node.getRulePath(event.getSchemaName()), event.getRuleConfigurationCacheId());
-    }
-    
-    /**
-     * Rule configuration cached.
-     *
-     * @param event rule configuration cached event
-     */
-    @Subscribe
-    public void renew(final RuleConfigurationCachedEvent event) {
-        StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(),
-                repository.get(node.getMetadataDataSourcePath(event.getSchemaName())),
-                repository.get(node.getRulePath(event.getSchemaName())),
-                registryCacheManager.loadCache(node.getRulePath(event.getSchemaName()), event.getCacheId()), event.getCacheId());
-        ShardingSphereEventBus.getInstance().post(startScalingEvent);
-    }
-    
     /**
      * Renew create user statement.
      *
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/impl/RuleChangedListener.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/impl/RuleChangedListener.java
index 6f2e093..6652804 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/impl/RuleChangedListener.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/impl/RuleChangedListener.java
@@ -55,7 +55,7 @@ public final class RuleChangedListener extends PostGovernanceRepositoryEventList
         if (isRuleChangedEvent(schemaName, event.getKey())) {
             return Optional.of(createRuleChangedEvent(schemaName, event));
         } else if (isRuleCachedEvent(schemaName, event.getKey())) {
-            return Optional.of(createRuleConfigurationCachedEvent(schemaName, event));
+            return Optional.of(new RuleConfigurationCachedEvent(event.getValue(), schemaName));
         }
         return Optional.empty();
     }
@@ -80,8 +80,4 @@ public final class RuleChangedListener extends PostGovernanceRepositoryEventList
         Preconditions.checkState(!rules.isEmpty(), "No available rule to load for governance.");
         return new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rules);
     }
-    
-    private GovernanceEvent createRuleConfigurationCachedEvent(final String schemaName, final DataChangedEvent event) {
-        return new RuleConfigurationCachedEvent(event.getValue(), schemaName);
-    }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
new file mode 100644
index 0000000..e2f31bb
--- /dev/null
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.registry.service.scaling;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.governance.core.registry.RegistryCacheManager;
+import org.apache.shardingsphere.governance.core.registry.RegistryCenterNode;
+import org.apache.shardingsphere.governance.core.registry.listener.event.rule.RuleConfigurationCachedEvent;
+import org.apache.shardingsphere.governance.core.registry.listener.event.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.governance.core.registry.listener.event.scaling.StartScalingEvent;
+import org.apache.shardingsphere.governance.core.registry.service.config.impl.SchemaRuleRegistryService;
+import org.apache.shardingsphere.governance.repository.spi.RegistryCenterRepository;
+import org.apache.shardingsphere.infra.config.RuleConfiguration;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
+
+import java.util.Collection;
+
+/**
+ * Scaling registry service.
+ */
+// TODO move to scaling module
+public final class ScalingRegistryService {
+    
+    private final RegistryCenterRepository repository;
+    
+    private final RegistryCenterNode node;
+    
+    private final SchemaRuleRegistryService schemaRuleService;
+    
+    private final RegistryCacheManager registryCacheManager;
+    
+    public ScalingRegistryService(final RegistryCenterRepository repository, final SchemaRuleRegistryService schemaRuleService) {
+        this.repository = repository;
+        node = new RegistryCenterNode();
+        this.schemaRuleService = schemaRuleService;
+        registryCacheManager = new RegistryCacheManager(repository, node);
+        ShardingSphereEventBus.getInstance().register(this);
+    }
+    
+    /**
+     * Switch rule configuration.
+     *
+     * @param event switch rule configuration event
+     */
+    @Subscribe
+    public void switchRuleConfiguration(final SwitchRuleConfigurationEvent event) {
+        schemaRuleService.persist(event.getSchemaName(), loadCachedRuleConfigurations(event.getSchemaName(), event.getRuleConfigurationCacheId()));
+        registryCacheManager.deleteCache(node.getRulePath(event.getSchemaName()), event.getRuleConfigurationCacheId());
+    }
+    
+    @SuppressWarnings("unchecked")
+    private Collection<RuleConfiguration> loadCachedRuleConfigurations(final String schemaName, final String ruleConfigCacheId) {
+        return new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(
+                YamlEngine.unmarshal(registryCacheManager.loadCache(node.getRulePath(schemaName), ruleConfigCacheId), Collection.class));
+    }
+    
+    /**
+     * Cache rule configuration.
+     *
+     * @param event rule configuration cached event
+     */
+    @Subscribe
+    public void cacheRuleConfiguration(final RuleConfigurationCachedEvent event) {
+        StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(),
+                repository.get(node.getMetadataDataSourcePath(event.getSchemaName())),
+                repository.get(node.getRulePath(event.getSchemaName())),
+                registryCacheManager.loadCache(node.getRulePath(event.getSchemaName()), event.getCacheId()), event.getCacheId());
+        ShardingSphereEventBus.getInstance().post(startScalingEvent);
+    }
+}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
index 9e52138..5cfc2d8 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.governance.core.registry;
 
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.core.registry.listener.event.rule.SwitchRuleConfigurationEvent;
 import org.apache.shardingsphere.governance.core.registry.service.config.impl.DataSourceRegistryService;
 import org.apache.shardingsphere.governance.core.registry.service.config.impl.GlobalRuleRegistryService;
 import org.apache.shardingsphere.governance.core.registry.service.config.impl.PropertiesRegistryService;
@@ -49,9 +48,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class RegistryCenterTest {
@@ -75,9 +72,6 @@ public final class RegistryCenterTest {
     @Mock
     private PropertiesRegistryService propsService;
     
-    @Mock
-    private RegistryCacheManager registryCacheManager;
-    
     private RegistryCenter registryCenter;
     
     @Before
@@ -165,16 +159,4 @@ public final class RegistryCenterTest {
         verify(registryCenterRepository).persist("/states/datanodes", "");
         verify(registryCenterRepository).persist("/states/primarynodes", "");
     }
-    
-    @Test
-    @SneakyThrows
-    public void assertRenewSwitchRuleConfigurationEvent() {
-        Field field = RegistryCenter.class.getDeclaredField("registryCacheManager");
-        field.setAccessible(true);
-        field.set(registryCenter, registryCacheManager);
-        when(registryCacheManager.loadCache(anyString(), eq("testCacheId"))).thenReturn(readYAML(SHARDING_RULE_YAML));
-        SwitchRuleConfigurationEvent event = new SwitchRuleConfigurationEvent("sharding_db", "testCacheId");
-        registryCenter.renew(event);
-        // TODO finish verify
-    }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
new file mode 100644
index 0000000..3444365
--- /dev/null
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.registry.service.scaling;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.governance.core.registry.RegistryCacheManager;
+import org.apache.shardingsphere.governance.core.registry.listener.event.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.governance.core.registry.service.config.impl.SchemaRuleRegistryService;
+import org.apache.shardingsphere.governance.repository.spi.RegistryCenterRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ScalingRegistryServiceTest {
+    
+    @Mock
+    private RegistryCenterRepository registryCenterRepository;
+    
+    @Mock
+    private SchemaRuleRegistryService schemaRuleService;
+    
+    @Mock
+    private RegistryCacheManager registryCacheManager;
+    
+    private ScalingRegistryService scalingRegistryService;
+    
+    @Before
+    public void setUp() throws ReflectiveOperationException {
+        scalingRegistryService = new ScalingRegistryService(registryCenterRepository, schemaRuleService);
+    }
+    
+    @Test
+    public void assertSwitchRuleConfiguration() throws ReflectiveOperationException {
+        Field field = ScalingRegistryService.class.getDeclaredField("registryCacheManager");
+        field.setAccessible(true);
+        field.set(scalingRegistryService, registryCacheManager);
+        when(registryCacheManager.loadCache(anyString(), eq("testCacheId"))).thenReturn(readYAML());
+        SwitchRuleConfigurationEvent event = new SwitchRuleConfigurationEvent("sharding_db", "testCacheId");
+        scalingRegistryService.switchRuleConfiguration(event);
+        // TODO finish verify
+    }
+    
+    public void  assertCacheRuleConfiguration() {
+        // TODO finish test case
+    }
+    
+    @SneakyThrows({IOException.class, URISyntaxException.class})
+    private String readYAML() {
+        return Files.readAllLines(Paths.get(ClassLoader.getSystemResource("yaml/regcenter/data-schema-rule.yaml").toURI()))
+                .stream().filter(each -> !each.startsWith("#")).map(each -> each + System.lineSeparator()).collect(Collectors.joining());
+    }
+}