You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2021/01/12 14:36:42 UTC

[shardingsphere] branch master updated: #7318, refactor elasticjob usage (#9002)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c9539f0  #7318, refactor elasticjob usage (#9002)
c9539f0 is described below

commit c9539f0a965507efacbbe37c352656bb07c550e3
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Tue Jan 12 22:36:13 2021 +0800

    #7318, refactor elasticjob usage (#9002)
---
 .../org/apache/shardingsphere/ha/spi/HAType.java   |  7 +--
 .../org/apache/shardingsphere/ha/rule/HARule.java  | 16 ++++--
 .../ha/fixture/TestHATypeFixture.java              |  4 --
 .../apache/shardingsphere/ha/mgr/MGRHAType.java    | 16 +++---
 .../ha/route/engine/HASQLRouterTest.java           | 67 ++++++++++++++++++++--
 .../ha/route/fixture/TestRouteHATypeFixture.java   |  4 --
 6 files changed, 79 insertions(+), 35 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
index 792c027..c3c536d 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
@@ -68,12 +68,7 @@ public interface HAType extends ShardingSphereAlgorithm {
      * @param groupName group name
      */
     void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames, String groupName, String primaryDataSourceName);
-    
-    /**
-     * Stop periodical update.
-     */
-    void stopPeriodicalUpdate();
-    
+
     /**
      * Get primary data source.
      *
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
index 72df105..83c0604 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
@@ -76,11 +76,13 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
         }
         for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
             String groupName = entry.getKey();
-            HAType haType = entry.getValue().getHaType();
+            HADataSourceRule haDataSourceRule = entry.getValue();
+            HAType haType = haDataSourceRule.getHaType();
             Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
-            Collection<String> disabledDataSourceNames = entry.getValue().getDisabledDataSourceNames();
-            String primaryDataSourceName = entry.getValue().getPrimaryDataSourceName();
+            Collection<String> disabledDataSourceNames = haDataSourceRule.getDisabledDataSourceNames();
+            String primaryDataSourceName = haDataSourceRule.getPrimaryDataSourceName();
             haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
+            haDataSourceRule.updatePrimaryDataSourceName(haType.getPrimaryDataSource());
             haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
             try {
                 haType.checkHAConfig(dataSourceMap, schemaName);
@@ -106,11 +108,13 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
         }
         for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
             String groupName = entry.getKey();
-            HAType haType = entry.getValue().getHaType();
+            HADataSourceRule haDataSourceRule = entry.getValue();
+            HAType haType = haDataSourceRule.getHaType();
             Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
-            Collection<String> disabledDataSourceNames = entry.getValue().getDisabledDataSourceNames();
-            String primaryDataSourceName = entry.getValue().getPrimaryDataSourceName();
+            Collection<String> disabledDataSourceNames = haDataSourceRule.getDisabledDataSourceNames();
+            String primaryDataSourceName = haDataSourceRule.getPrimaryDataSourceName();
             haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
+            haDataSourceRule.updatePrimaryDataSourceName(haType.getPrimaryDataSource());
             haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
             try {
                 haType.checkHAConfig(dataSourceMap, schemaName);
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
index 7eb0565..91d419c 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
@@ -47,10 +47,6 @@ public final class TestHATypeFixture implements HAType {
     }
     
     @Override
-    public void stopPeriodicalUpdate() {
-    }
-    
-    @Override
     public String getPrimaryDataSource() {
         return null;
     }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
index 9d64169..f1297c4 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
@@ -62,7 +62,7 @@ public final class MGRHAType implements HAType {
     
     private static CoordinatorRegistryCenter coordinatorRegistryCenter;
     
-    private ScheduleJobBootstrap scheduleJobBootstrap;
+    private static final Map<String, ScheduleJobBootstrap> SCHEDULE_JOB_BOOTSTRAP_MAP = new HashMap<>(16, 1);
     
     private String oldPrimaryDataSource;
     
@@ -275,14 +275,12 @@ public final class MGRHAType implements HAType {
             coordinatorRegistryCenter = new ZookeeperRegistryCenter(zkConfig);
             coordinatorRegistryCenter.init();
         }
-        scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName),
-                JobConfiguration.newBuilder("MGRPeriodicalJob", 1).cron(props.getProperty("keepAliveCron")).build());
-        scheduleJobBootstrap.schedule();
-    }
-    
-    @Override
-    public void stopPeriodicalUpdate() {
-        scheduleJobBootstrap.shutdown();
+        if (null != SCHEDULE_JOB_BOOTSTRAP_MAP.get(groupName)) {
+            SCHEDULE_JOB_BOOTSTRAP_MAP.get(groupName).shutdown();
+        }
+        SCHEDULE_JOB_BOOTSTRAP_MAP.put(groupName, new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames,
+                groupName, primaryDataSourceName), JobConfiguration.newBuilder("MGR-" + groupName, 1).cron(props.getProperty("keepAliveCron")).build()));
+        SCHEDULE_JOB_BOOTSTRAP_MAP.get(groupName).schedule();
     }
     
     @Override
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java
index 66c4a7f..ffe5552 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java
@@ -31,6 +31,8 @@ import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.route.SQLRouter;
 import org.apache.shardingsphere.infra.route.context.RouteContext;
+import org.apache.shardingsphere.infra.route.context.RouteMapper;
+import org.apache.shardingsphere.infra.route.context.RouteUnit;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.predicate.LockSegment;
@@ -50,7 +52,8 @@ import java.util.Iterator;
 import java.util.Optional;
 import java.util.Properties;
 
-import static org.junit.Assert.assertNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -62,6 +65,8 @@ public final class HASQLRouterTest {
     
     private static final String NONE_HA_DATASOURCE_NAME = "noneHADatasource";
     
+    private static final String PRIMARY_DATASOURCE = "primary";
+    
     private static final String REPLICA_DATASOURCE = "query";
     
     private HARule rule;
@@ -78,7 +83,7 @@ public final class HASQLRouterTest {
     @Before
     public void setUp() {
         rule = new HARule(new HARuleConfiguration(Collections.singleton(
-                new HADataSourceRuleConfiguration(DATASOURCE_NAME, Collections.singletonList(REPLICA_DATASOURCE), null, false, "haTypeName")),
+                new HADataSourceRuleConfiguration(DATASOURCE_NAME, Collections.singletonList(REPLICA_DATASOURCE), null, true, "haTypeName")),
                 Collections.emptyMap(), Collections.emptyMap()), mock(DatabaseType.class),
                 Collections.singletonMap("ds", mock(DataSource.class)), "ha_db");
         sqlRouter = (HASQLRouter) OrderedSPIRegistry.getRegisteredServices(Collections.singleton(rule), SQLRouter.class).get(rule);
@@ -96,7 +101,19 @@ public final class HASQLRouterTest {
                 mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
         RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
         Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
-        assertNull(routedDataSourceNames.next());
+        assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
+    }
+    
+    @Test
+    public void assertDecorateRouteContextToPrimaryDataSource() {
+        RouteContext actual = mockRouteContext();
+        LogicSQL logicSQL = new LogicSQL(mock(SQLStatementContext.class), "", Collections.emptyList());
+        ShardingSphereMetaData metaData = new ShardingSphereMetaData("logic_schema",
+                mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
+        sqlRouter.decorateRouteContext(actual, logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
+        Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
+        assertThat(routedDataSourceNames.next(), is(NONE_HA_DATASOURCE_NAME));
+        assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
     }
     
     @Test
@@ -109,7 +126,22 @@ public final class HASQLRouterTest {
                 mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
         RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
         Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
-        assertNull(routedDataSourceNames.next());
+        assertThat(routedDataSourceNames.next(), is(REPLICA_DATASOURCE));
+    }
+    
+    @Test
+    public void assertDecorateRouteContextToReplicaDataSource() {
+        RouteContext actual = mockRouteContext();
+        MySQLSelectStatement selectStatement = mock(MySQLSelectStatement.class);
+        when(sqlStatementContext.getSqlStatement()).thenReturn(selectStatement);
+        when(selectStatement.getLock()).thenReturn(Optional.empty());
+        LogicSQL logicSQL = new LogicSQL(sqlStatementContext, "", Collections.emptyList());
+        ShardingSphereMetaData metaData = new ShardingSphereMetaData("logic_schema",
+                mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
+        sqlRouter.decorateRouteContext(actual, logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
+        Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
+        assertThat(routedDataSourceNames.next(), is(NONE_HA_DATASOURCE_NAME));
+        assertThat(routedDataSourceNames.next(), is(REPLICA_DATASOURCE));
     }
     
     @Test
@@ -122,7 +154,22 @@ public final class HASQLRouterTest {
                 mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
         RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
         Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
-        assertNull(routedDataSourceNames.next());
+        assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
+    }
+    
+    @Test
+    public void assertDecorateRouteContextToPrimaryDataSourceWithLock() {
+        RouteContext actual = mockRouteContext();
+        MySQLSelectStatement selectStatement = mock(MySQLSelectStatement.class);
+        when(sqlStatementContext.getSqlStatement()).thenReturn(selectStatement);
+        when(selectStatement.getLock()).thenReturn(Optional.of(mock(LockSegment.class)));
+        LogicSQL logicSQL = new LogicSQL(sqlStatementContext, "", Collections.emptyList());
+        ShardingSphereMetaData metaData = new ShardingSphereMetaData("logic_schema",
+                mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
+        sqlRouter.decorateRouteContext(actual, logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
+        Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
+        assertThat(routedDataSourceNames.next(), is(NONE_HA_DATASOURCE_NAME));
+        assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
     }
     
     @Test
@@ -133,6 +180,14 @@ public final class HASQLRouterTest {
                 mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
         RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
         Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
-        assertNull(routedDataSourceNames.next());
+        assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
+    }
+    
+    private RouteContext mockRouteContext() {
+        RouteContext result = new RouteContext();
+        RouteUnit routeUnit = new RouteUnit(new RouteMapper(DATASOURCE_NAME, DATASOURCE_NAME), Collections.singletonList(new RouteMapper("table", "table_0")));
+        result.getRouteUnits().add(routeUnit);
+        result.getRouteUnits().add(new RouteUnit(new RouteMapper(NONE_HA_DATASOURCE_NAME, NONE_HA_DATASOURCE_NAME), Collections.emptyList()));
+        return result;
     }
 }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
index 685cbdd..b418ac2 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
@@ -47,10 +47,6 @@ public final class TestRouteHATypeFixture implements HAType {
     }
     
     @Override
-    public void stopPeriodicalUpdate() {
-    }
-    
-    @Override
     public String getPrimaryDataSource() {
         return "primary";
     }