You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ki...@apache.org on 2020/06/12 10:49:58 UTC

[shardingsphere] branch master updated: Support heart beat for sharding jdbc with yaml (#6015)

This is an automated email from the ASF dual-hosted git repository.

kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b6a858c  Support heart beat for sharding jdbc with yaml (#6015)
b6a858c is described below

commit b6a858ce5dfc1b41757f6ee3f4a4b7995bc0f689
Author: Haoran Meng <lo...@163.com>
AuthorDate: Fri Jun 12 18:49:50 2020 +0800

    Support heart beat for sharding jdbc with yaml (#6015)
    
    * Support heart beat for sharding jdbc with yaml
    
    * Remove unnecessary static variables
---
 .../shardingsphere-jdbc-orchestration/pom.xml      |  5 ++++
 ...chestrationShardingSphereDataSourceFactory.java | 32 ++++++++++++--------
 .../AbstractOrchestrationDataSource.java           |  2 ++
 .../OrchestrationShardingSphereDataSource.java     | 35 ++++++++++++++++++++++
 .../YamlOrchestrationRootRuleConfigurations.java   |  3 ++
 5 files changed, 65 insertions(+), 12 deletions(-)

diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/pom.xml b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/pom.xml
index 5f459bc..701f839 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/pom.xml
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/pom.xml
@@ -53,6 +53,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-cluster-facade</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
         </dependency>
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/api/yaml/YamlOrchestrationShardingSphereDataSourceFactory.java b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/api/yaml/YamlOrchestrationShardingSphereDataSourceFactory.java
index 32bcea7..8edd1d7 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/api/yaml/YamlOrchestrationShardingSphereDataSourceFactory.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/api/yaml/YamlOrchestrationShardingSphereDataSourceFactory.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.driver.orchestration.api.yaml;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.cluster.configuration.swapper.ClusterConfigurationYamlSwapper;
+import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.driver.orchestration.internal.datasource.OrchestrationShardingSphereDataSource;
 import org.apache.shardingsphere.driver.orchestration.internal.util.YamlCenterRepositoryConfigurationSwapperUtil;
@@ -41,8 +43,6 @@ import java.util.Properties;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class YamlOrchestrationShardingSphereDataSourceFactory {
     
-    private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
-    
     /**
      * Create ShardingSphere data source.
      *
@@ -53,7 +53,7 @@ public final class YamlOrchestrationShardingSphereDataSourceFactory {
      */
     public static DataSource createDataSource(final File yamlFile) throws SQLException, IOException {
         YamlOrchestrationRootRuleConfigurations configurations = unmarshal(yamlFile);
-        return createDataSource(configurations.getDataSources(), configurations, configurations.getProps(), configurations.getOrchestration());
+        return createDataSource(configurations.getDataSources(), configurations, configurations.getProps(), configurations.getOrchestration(), configurations.getCluster());
     }
     
     /**
@@ -67,7 +67,7 @@ public final class YamlOrchestrationShardingSphereDataSourceFactory {
      */
     public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final File yamlFile) throws SQLException, IOException {
         YamlOrchestrationRootRuleConfigurations configurations = unmarshal(yamlFile);
-        return createDataSource(dataSourceMap, configurations, configurations.getProps(), configurations.getOrchestration());
+        return createDataSource(dataSourceMap, configurations, configurations.getProps(), configurations.getOrchestration(), configurations.getCluster());
     }
     
     /**
@@ -80,7 +80,7 @@ public final class YamlOrchestrationShardingSphereDataSourceFactory {
      */
     public static DataSource createDataSource(final byte[] yamlBytes) throws SQLException, IOException {
         YamlOrchestrationRootRuleConfigurations configurations = unmarshal(yamlBytes);
-        return createDataSource(configurations.getDataSources(), configurations, configurations.getProps(), configurations.getOrchestration());
+        return createDataSource(configurations.getDataSources(), configurations, configurations.getProps(), configurations.getOrchestration(), configurations.getCluster());
     }
     
     /**
@@ -94,17 +94,25 @@ public final class YamlOrchestrationShardingSphereDataSourceFactory {
      */
     public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final byte[] yamlBytes) throws SQLException, IOException {
         YamlOrchestrationRootRuleConfigurations configurations = unmarshal(yamlBytes);
-        return createDataSource(dataSourceMap, configurations, configurations.getProps(), configurations.getOrchestration());
+        return createDataSource(dataSourceMap, configurations, configurations.getProps(), configurations.getOrchestration(), configurations.getCluster());
     }
     
-    private static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final YamlOrchestrationRootRuleConfigurations configurations, 
-                                               final Properties props, final Map<String, YamlCenterRepositoryConfiguration> yamlInstanceConfigurationMap) throws SQLException {
+    private static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final YamlOrchestrationRootRuleConfigurations configurations,
+                                               final Properties props, final Map<String, YamlCenterRepositoryConfiguration> yamlInstanceConfigurationMap,
+                                               final YamlClusterConfiguration yamlClusterConfiguration) throws SQLException {
         if (configurations.getRules().isEmpty() || dataSourceMap.isEmpty()) {
-            return new OrchestrationShardingSphereDataSource(new OrchestrationConfiguration(YamlCenterRepositoryConfigurationSwapperUtil.marshal(yamlInstanceConfigurationMap)));
+            return null == yamlClusterConfiguration
+                    ? new OrchestrationShardingSphereDataSource(new OrchestrationConfiguration(YamlCenterRepositoryConfigurationSwapperUtil.marshal(yamlInstanceConfigurationMap)))
+                    : new OrchestrationShardingSphereDataSource(new OrchestrationConfiguration(YamlCenterRepositoryConfigurationSwapperUtil.marshal(yamlInstanceConfigurationMap)),
+                    new ClusterConfigurationYamlSwapper().swap(yamlClusterConfiguration));
         } else {
-            ShardingSphereDataSource shardingSphereDataSource = new ShardingSphereDataSource(dataSourceMap, SWAPPER_ENGINE.swapToRuleConfigurations(configurations.getRules()), props);
-            return new OrchestrationShardingSphereDataSource(
-                    shardingSphereDataSource, new OrchestrationConfiguration(YamlCenterRepositoryConfigurationSwapperUtil.marshal(yamlInstanceConfigurationMap)));
+            ShardingSphereDataSource shardingSphereDataSource = new ShardingSphereDataSource(dataSourceMap,
+                    new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(configurations.getRules()), props);
+            return null == yamlClusterConfiguration ? new OrchestrationShardingSphereDataSource(shardingSphereDataSource,
+                    new OrchestrationConfiguration(YamlCenterRepositoryConfigurationSwapperUtil.marshal(yamlInstanceConfigurationMap)))
+                    : new OrchestrationShardingSphereDataSource(shardingSphereDataSource,
+                    new OrchestrationConfiguration(YamlCenterRepositoryConfigurationSwapperUtil.marshal(yamlInstanceConfigurationMap)),
+                    new ClusterConfigurationYamlSwapper().swap(yamlClusterConfiguration));
         }
     }
     
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/AbstractOrchestrationDataSource.java b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/AbstractOrchestrationDataSource.java
index 8f1fe4a..3aef6d0 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/AbstractOrchestrationDataSource.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/AbstractOrchestrationDataSource.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.cluster.heartbeat.eventbus.HeartbeatEventBus;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
 import org.apache.shardingsphere.driver.orchestration.internal.util.DataSourceConverter;
@@ -63,6 +64,7 @@ public abstract class AbstractOrchestrationDataSource extends AbstractUnsupporte
     public AbstractOrchestrationDataSource(final ShardingOrchestrationFacade shardingOrchestrationFacade) {
         this.shardingOrchestrationFacade = shardingOrchestrationFacade;
         ShardingOrchestrationEventBus.getInstance().register(this);
+        HeartbeatEventBus.getInstance().register(this);
     }
     
     protected abstract DataSource getDataSource();
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/OrchestrationShardingSphereDataSource.java b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/OrchestrationShardingSphereDataSource.java
index 9617fdd..a920d3b 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/OrchestrationShardingSphereDataSource.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/datasource/OrchestrationShardingSphereDataSource.java
@@ -22,6 +22,9 @@ import com.google.common.eventbus.Subscribe;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
+import org.apache.shardingsphere.cluster.facade.ClusterFacade;
+import org.apache.shardingsphere.cluster.heartbeat.event.HeartbeatDetectNoticeEvent;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.driver.orchestration.internal.circuit.datasource.CircuitBreakerDataSource;
 import org.apache.shardingsphere.driver.orchestration.internal.util.DataSourceConverter;
@@ -82,6 +85,28 @@ public class OrchestrationShardingSphereDataSource extends AbstractOrchestration
         persistMetaData(dataSource.getSchemaContexts().getDefaultSchemaContext().getSchema().getMetaData().getSchema());
     }
     
+    public OrchestrationShardingSphereDataSource(final OrchestrationConfiguration orchestrationConfig, final ClusterConfiguration clusterConfiguration) throws SQLException {
+        super(new ShardingOrchestrationFacade(orchestrationConfig, Collections.singletonList(DefaultSchema.LOGIC_NAME)));
+        ConfigCenter configService = getShardingOrchestrationFacade().getConfigCenter();
+        Collection<RuleConfiguration> configurations = configService.loadRuleConfigurations(DefaultSchema.LOGIC_NAME);
+        Preconditions.checkState(!configurations.isEmpty(), "Missing the sharding rule configuration on registry center");
+        Map<String, DataSourceConfiguration> dataSourceConfigurations = configService.loadDataSourceConfigurations(DefaultSchema.LOGIC_NAME);
+        dataSource = new ShardingSphereDataSource(DataSourceConverter.getDataSourceMap(dataSourceConfigurations), configurations, configService.loadProperties());
+        initShardingOrchestrationFacade();
+        ClusterFacade.getInstance().init(clusterConfiguration);
+        persistMetaData(dataSource.getSchemaContexts().getDefaultSchemaContext().getSchema().getMetaData().getSchema());
+    }
+    
+    public OrchestrationShardingSphereDataSource(final ShardingSphereDataSource shardingSphereDataSource,
+                                                 final OrchestrationConfiguration orchestrationConfig, final ClusterConfiguration clusterConfiguration) {
+        super(new ShardingOrchestrationFacade(orchestrationConfig, Collections.singletonList(DefaultSchema.LOGIC_NAME)));
+        dataSource = shardingSphereDataSource;
+        initShardingOrchestrationFacade(Collections.singletonMap(DefaultSchema.LOGIC_NAME, DataSourceConverter.getDataSourceConfigurationMap(dataSource.getDataSourceMap())),
+                getRuleConfigurationMap(), dataSource.getSchemaContexts().getProperties().getProps());
+        ClusterFacade.getInstance().init(clusterConfiguration);
+        persistMetaData(dataSource.getSchemaContexts().getDefaultSchemaContext().getSchema().getMetaData().getSchema());
+    }
+    
     private Map<String, Collection<RuleConfiguration>> getRuleConfigurationMap() {
         Map<String, Collection<RuleConfiguration>> result = new LinkedHashMap<>(1, 1);
         result.put(DefaultSchema.LOGIC_NAME, dataSource.getSchemaContexts().getDefaultSchemaContext().getSchema().getConfigurations());
@@ -186,6 +211,16 @@ public class OrchestrationShardingSphereDataSource extends AbstractOrchestration
         }
     }
     
+    /**
+     * Heart beat detect.
+     *
+     * @param event heart beat detect notice event
+     */
+    @Subscribe
+    public synchronized void heartbeat(final HeartbeatDetectNoticeEvent event) {
+        ClusterFacade.getInstance().detectHeartbeat(dataSource.getSchemaContexts().getSchemaContexts());
+    }
+    
     private ShardingSphereSchema getChangedShardingSphereSchema(final ShardingSphereSchema oldShardingSphereSchema, final RuleSchemaMetaData newRuleSchemaMetaData) {
         ShardingSphereMetaData metaData = new ShardingSphereMetaData(oldShardingSphereSchema.getMetaData().getDataSources(), newRuleSchemaMetaData);
         return new ShardingSphereSchema(oldShardingSphereSchema.getDatabaseType(), oldShardingSphereSchema.getConfigurations(),
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/yaml/YamlOrchestrationRootRuleConfigurations.java b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/yaml/YamlOrchestrationRootRuleConfigurations.java
index 4d53142..4fd66aa 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/yaml/YamlOrchestrationRootRuleConfigurations.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-orchestration/src/main/java/org/apache/shardingsphere/driver/orchestration/internal/yaml/YamlOrchestrationRootRuleConfigurations.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.driver.orchestration.internal.yaml;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
 import org.apache.shardingsphere.orchestration.center.yaml.config.YamlCenterRepositoryConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
 
@@ -32,4 +33,6 @@ import java.util.Map;
 public final class YamlOrchestrationRootRuleConfigurations extends YamlRootRuleConfigurations {
     
     private Map<String, YamlCenterRepositoryConfiguration> orchestration;
+    
+    private YamlClusterConfiguration cluster;
 }