You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2021/01/12 14:36:42 UTC
[shardingsphere] branch master updated: #7318,
refactor elasticjob usage (#9002)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c9539f0 #7318, refactor elasticjob usage (#9002)
c9539f0 is described below
commit c9539f0a965507efacbbe37c352656bb07c550e3
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Tue Jan 12 22:36:13 2021 +0800
#7318, refactor elasticjob usage (#9002)
---
.../org/apache/shardingsphere/ha/spi/HAType.java | 7 +--
.../org/apache/shardingsphere/ha/rule/HARule.java | 16 ++++--
.../ha/fixture/TestHATypeFixture.java | 4 --
.../apache/shardingsphere/ha/mgr/MGRHAType.java | 16 +++---
.../ha/route/engine/HASQLRouterTest.java | 67 ++++++++++++++++++++--
.../ha/route/fixture/TestRouteHATypeFixture.java | 4 --
6 files changed, 79 insertions(+), 35 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
index 792c027..c3c536d 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
@@ -68,12 +68,7 @@ public interface HAType extends ShardingSphereAlgorithm {
* @param groupName group name
*/
void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames, String groupName, String primaryDataSourceName);
-
- /**
- * Stop periodical update.
- */
- void stopPeriodicalUpdate();
-
+
/**
* Get primary data source.
*
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
index 72df105..83c0604 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
@@ -76,11 +76,13 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
}
for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
String groupName = entry.getKey();
- HAType haType = entry.getValue().getHaType();
+ HADataSourceRule haDataSourceRule = entry.getValue();
+ HAType haType = haDataSourceRule.getHaType();
Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
- Collection<String> disabledDataSourceNames = entry.getValue().getDisabledDataSourceNames();
- String primaryDataSourceName = entry.getValue().getPrimaryDataSourceName();
+ Collection<String> disabledDataSourceNames = haDataSourceRule.getDisabledDataSourceNames();
+ String primaryDataSourceName = haDataSourceRule.getPrimaryDataSourceName();
haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
+ haDataSourceRule.updatePrimaryDataSourceName(haType.getPrimaryDataSource());
haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
try {
haType.checkHAConfig(dataSourceMap, schemaName);
@@ -106,11 +108,13 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
}
for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
String groupName = entry.getKey();
- HAType haType = entry.getValue().getHaType();
+ HADataSourceRule haDataSourceRule = entry.getValue();
+ HAType haType = haDataSourceRule.getHaType();
Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
- Collection<String> disabledDataSourceNames = entry.getValue().getDisabledDataSourceNames();
- String primaryDataSourceName = entry.getValue().getPrimaryDataSourceName();
+ Collection<String> disabledDataSourceNames = haDataSourceRule.getDisabledDataSourceNames();
+ String primaryDataSourceName = haDataSourceRule.getPrimaryDataSourceName();
haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
+ haDataSourceRule.updatePrimaryDataSourceName(haType.getPrimaryDataSource());
haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
try {
haType.checkHAConfig(dataSourceMap, schemaName);
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
index 7eb0565..91d419c 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
@@ -47,10 +47,6 @@ public final class TestHATypeFixture implements HAType {
}
@Override
- public void stopPeriodicalUpdate() {
- }
-
- @Override
public String getPrimaryDataSource() {
return null;
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
index 9d64169..f1297c4 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
@@ -62,7 +62,7 @@ public final class MGRHAType implements HAType {
private static CoordinatorRegistryCenter coordinatorRegistryCenter;
- private ScheduleJobBootstrap scheduleJobBootstrap;
+ private static final Map<String, ScheduleJobBootstrap> SCHEDULE_JOB_BOOTSTRAP_MAP = new HashMap<>(16, 1);
private String oldPrimaryDataSource;
@@ -275,14 +275,12 @@ public final class MGRHAType implements HAType {
coordinatorRegistryCenter = new ZookeeperRegistryCenter(zkConfig);
coordinatorRegistryCenter.init();
}
- scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName),
- JobConfiguration.newBuilder("MGRPeriodicalJob", 1).cron(props.getProperty("keepAliveCron")).build());
- scheduleJobBootstrap.schedule();
- }
-
- @Override
- public void stopPeriodicalUpdate() {
- scheduleJobBootstrap.shutdown();
+ if (null != SCHEDULE_JOB_BOOTSTRAP_MAP.get(groupName)) {
+ SCHEDULE_JOB_BOOTSTRAP_MAP.get(groupName).shutdown();
+ }
+ SCHEDULE_JOB_BOOTSTRAP_MAP.put(groupName, new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames,
+ groupName, primaryDataSourceName), JobConfiguration.newBuilder("MGR-" + groupName, 1).cron(props.getProperty("keepAliveCron")).build()));
+ SCHEDULE_JOB_BOOTSTRAP_MAP.get(groupName).schedule();
}
@Override
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java
index 66c4a7f..ffe5552 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/engine/HASQLRouterTest.java
@@ -31,6 +31,8 @@ import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.route.SQLRouter;
import org.apache.shardingsphere.infra.route.context.RouteContext;
+import org.apache.shardingsphere.infra.route.context.RouteMapper;
+import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.predicate.LockSegment;
@@ -50,7 +52,8 @@ import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
-import static org.junit.Assert.assertNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -62,6 +65,8 @@ public final class HASQLRouterTest {
private static final String NONE_HA_DATASOURCE_NAME = "noneHADatasource";
+ private static final String PRIMARY_DATASOURCE = "primary";
+
private static final String REPLICA_DATASOURCE = "query";
private HARule rule;
@@ -78,7 +83,7 @@ public final class HASQLRouterTest {
@Before
public void setUp() {
rule = new HARule(new HARuleConfiguration(Collections.singleton(
- new HADataSourceRuleConfiguration(DATASOURCE_NAME, Collections.singletonList(REPLICA_DATASOURCE), null, false, "haTypeName")),
+ new HADataSourceRuleConfiguration(DATASOURCE_NAME, Collections.singletonList(REPLICA_DATASOURCE), null, true, "haTypeName")),
Collections.emptyMap(), Collections.emptyMap()), mock(DatabaseType.class),
Collections.singletonMap("ds", mock(DataSource.class)), "ha_db");
sqlRouter = (HASQLRouter) OrderedSPIRegistry.getRegisteredServices(Collections.singleton(rule), SQLRouter.class).get(rule);
@@ -96,7 +101,19 @@ public final class HASQLRouterTest {
mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
- assertNull(routedDataSourceNames.next());
+ assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
+ }
+
+ @Test
+ public void assertDecorateRouteContextToPrimaryDataSource() {
+ RouteContext actual = mockRouteContext();
+ LogicSQL logicSQL = new LogicSQL(mock(SQLStatementContext.class), "", Collections.emptyList());
+ ShardingSphereMetaData metaData = new ShardingSphereMetaData("logic_schema",
+ mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
+ sqlRouter.decorateRouteContext(actual, logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
+ Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
+ assertThat(routedDataSourceNames.next(), is(NONE_HA_DATASOURCE_NAME));
+ assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
}
@Test
@@ -109,7 +126,22 @@ public final class HASQLRouterTest {
mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
- assertNull(routedDataSourceNames.next());
+ assertThat(routedDataSourceNames.next(), is(REPLICA_DATASOURCE));
+ }
+
+ @Test
+ public void assertDecorateRouteContextToReplicaDataSource() {
+ RouteContext actual = mockRouteContext();
+ MySQLSelectStatement selectStatement = mock(MySQLSelectStatement.class);
+ when(sqlStatementContext.getSqlStatement()).thenReturn(selectStatement);
+ when(selectStatement.getLock()).thenReturn(Optional.empty());
+ LogicSQL logicSQL = new LogicSQL(sqlStatementContext, "", Collections.emptyList());
+ ShardingSphereMetaData metaData = new ShardingSphereMetaData("logic_schema",
+ mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
+ sqlRouter.decorateRouteContext(actual, logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
+ Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
+ assertThat(routedDataSourceNames.next(), is(NONE_HA_DATASOURCE_NAME));
+ assertThat(routedDataSourceNames.next(), is(REPLICA_DATASOURCE));
}
@Test
@@ -122,7 +154,22 @@ public final class HASQLRouterTest {
mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
- assertNull(routedDataSourceNames.next());
+ assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
+ }
+
+ @Test
+ public void assertDecorateRouteContextToPrimaryDataSourceWithLock() {
+ RouteContext actual = mockRouteContext();
+ MySQLSelectStatement selectStatement = mock(MySQLSelectStatement.class);
+ when(sqlStatementContext.getSqlStatement()).thenReturn(selectStatement);
+ when(selectStatement.getLock()).thenReturn(Optional.of(mock(LockSegment.class)));
+ LogicSQL logicSQL = new LogicSQL(sqlStatementContext, "", Collections.emptyList());
+ ShardingSphereMetaData metaData = new ShardingSphereMetaData("logic_schema",
+ mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
+ sqlRouter.decorateRouteContext(actual, logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
+ Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
+ assertThat(routedDataSourceNames.next(), is(NONE_HA_DATASOURCE_NAME));
+ assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
}
@Test
@@ -133,6 +180,14 @@ public final class HASQLRouterTest {
mock(ShardingSphereResource.class, RETURNS_DEEP_STUBS), new ShardingSphereRuleMetaData(Collections.emptyList(), Collections.singleton(rule)), mock(ShardingSphereSchema.class));
RouteContext actual = sqlRouter.createRouteContext(logicSQL, metaData, rule, new ConfigurationProperties(new Properties()));
Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
- assertNull(routedDataSourceNames.next());
+ assertThat(routedDataSourceNames.next(), is(PRIMARY_DATASOURCE));
+ }
+
+ private RouteContext mockRouteContext() {
+ RouteContext result = new RouteContext();
+ RouteUnit routeUnit = new RouteUnit(new RouteMapper(DATASOURCE_NAME, DATASOURCE_NAME), Collections.singletonList(new RouteMapper("table", "table_0")));
+ result.getRouteUnits().add(routeUnit);
+ result.getRouteUnits().add(new RouteUnit(new RouteMapper(NONE_HA_DATASOURCE_NAME, NONE_HA_DATASOURCE_NAME), Collections.emptyList()));
+ return result;
}
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
index 685cbdd..b418ac2 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
@@ -47,10 +47,6 @@ public final class TestRouteHATypeFixture implements HAType {
}
@Override
- public void stopPeriodicalUpdate() {
- }
-
- @Override
public String getPrimaryDataSource() {
return "primary";
}