You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2022/01/06 11:57:52 UTC
[shardingsphere] branch master updated: Get table metadata from context on scaling data consistency check (#14574)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0872546 Get table metadata from context on scaling data consistency check (#14574)
0872546 is described below
commit 0872546f7c49c47ee3933191417143179c0f074e
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 6 19:57:05 2022 +0800
Get table metadata from context on scaling data consistency check (#14574)
* Get table metadata from context
* Unit test
---
.../consistency/DataConsistencyCheckerImpl.java | 41 ++++++++--------------
.../scenario/rulealtered/RuleAlteredContext.java | 21 +++++++++++
.../api/datasource/PipelineDataSourceWrapper.java | 1 +
.../proxy/initializer/BootstrapInitializer.java | 15 +++++---
.../pipeline/api/impl/PipelineJobAPIImplTest.java | 2 ++
.../DataConsistencyCheckerImplTest.java | 9 ++++-
.../data/pipeline/core/util/ResourceUtil.java | 11 +++++-
.../pipeline/core/util/RuleAlteredContextUtil.java | 24 +++++++++++++
8 files changed, 91 insertions(+), 33 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index e73ad3b..7f0d8cc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -27,14 +28,16 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
import javax.sql.DataSource;
@@ -49,7 +52,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
@@ -66,8 +68,6 @@ import java.util.stream.Collectors;
@Slf4j
public final class DataConsistencyCheckerImpl implements DataConsistencyChecker {
- private static final Map<PipelineDataSourceConfiguration, PipelineTableMetaDataLoader> TABLE_META_DATA_LOADER_MAP = new ConcurrentHashMap<>();
-
private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
// TODO replace to JobConfiguration
@@ -146,17 +146,17 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
JobRateLimitAlgorithm rateLimitAlgorithm = jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
try (PipelineDataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetDataSourceConfig)) {
- PipelineTableMetaDataLoader tableMetaDataLoader = getTableMetaDataLoader(sourceDataSourceConfig, sourceDataSource);
+ Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobContext.getJobConfig().getWorkflowConfig().getSchemaName());
logicTableNames.forEach(each -> {
//TODO put to preparer
- if (null == tableMetaDataLoader.getTableMetaData(each)) {
+ if (!tableMetaDataMap.containsKey(each)) {
throw new PipelineDataConsistencyCheckFailedException(String.format("could not get metadata for table '%s'", each));
}
});
for (String each : logicTableNames) {
- PipelineTableMetaData tableMetaData = tableMetaDataLoader.getTableMetaData(each);
- Collection<String> columnNames = tableMetaData.getColumnNames();
- String uniqueKey = tableMetaData.getPrimaryKeys().get(0);
+ TableMetaData tableMetaData = tableMetaDataMap.get(each);
+ Collection<String> columnNames = tableMetaData.getColumns().keySet();
+ String uniqueKey = tableMetaData.getPrimaryKeyColumns().get(0);
DataCalculateParameter sourceCalculateParameter = DataCalculateParameter.builder().dataSource(sourceDataSource).databaseType(sourceDatabaseType).peerDatabaseType(targetDatabaseType)
.logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build();
DataCalculateParameter targetCalculateParameter = DataCalculateParameter.builder().dataSource(targetDataSource).databaseType(targetDatabaseType).peerDatabaseType(sourceDatabaseType)
@@ -194,21 +194,10 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
}
}
- private PipelineTableMetaDataLoader getTableMetaDataLoader(final PipelineDataSourceConfiguration sourceDataSourceConfig, final PipelineDataSourceWrapper sourceDataSource) throws SQLException {
- PipelineTableMetaDataLoader result = TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig);
- if (null != result) {
- return result;
- }
- synchronized (TABLE_META_DATA_LOADER_MAP) {
- result = TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig);
- if (null != result) {
- return result;
- }
- try (Connection connection = sourceDataSource.getConnection()) {
- result = new PipelineTableMetaDataLoader(connection, "%");
- TABLE_META_DATA_LOADER_MAP.put(sourceDataSourceConfig, result);
- }
- return result;
- }
+ private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
+ ContextManager contextManager = RuleAlteredContext.getContextManager();
+ Preconditions.checkNotNull(contextManager, "contextManager null");
+ ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(schemaName);
+ return metaData.getSchema().getTables();
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 4e4bee3..c939b2b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
/**
@@ -50,6 +51,8 @@ public final class RuleAlteredContext {
private static volatile ModeConfiguration modeConfig;
+ private static volatile ContextManager contextManager;
+
private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
private final JobRateLimitAlgorithm rateLimitAlgorithm;
@@ -112,4 +115,22 @@ public final class RuleAlteredContext {
public static void initModeConfig(final ModeConfiguration modeConfig) {
RuleAlteredContext.modeConfig = modeConfig;
}
+
+ /**
+ * Get context manager.
+ *
+ * @return context manager
+ */
+ public static ContextManager getContextManager() {
+ return contextManager;
+ }
+
+ /**
+ * Initialize context manager.
+ *
+ * @param contextManager context manager
+ */
+ public static void initContextManager(final ContextManager contextManager) {
+ RuleAlteredContext.contextManager = contextManager;
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java
index 205c023..b21eb45 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java
@@ -36,6 +36,7 @@ import java.util.logging.Logger;
@Slf4j
public final class PipelineDataSourceWrapper implements DataSource, AutoCloseable {
+ @Getter
private final DataSource dataSource;
@Getter
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index 7eb6bc9..8a009c2 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -68,19 +68,23 @@ public final class BootstrapInitializer {
public void init(final YamlProxyConfiguration yamlConfig, final int port) throws SQLException {
ModeConfiguration modeConfig = null == yamlConfig.getServerConfiguration().getMode()
? null : new ModeConfigurationYamlSwapper().swapToObject(yamlConfig.getServerConfiguration().getMode());
- initContext(yamlConfig, modeConfig, port);
- initRuleAlteredJobWorker(modeConfig);
+ ContextManager contextManager = createContextManager(yamlConfig, modeConfig, port);
+ initContext(contextManager);
+ initRuleAlteredJobWorker(modeConfig, contextManager);
setDatabaseServerInfo();
}
- private void initContext(final YamlProxyConfiguration yamlConfig, final ModeConfiguration modeConfig, final int port) throws SQLException {
+ private ContextManager createContextManager(final YamlProxyConfiguration yamlConfig, final ModeConfiguration modeConfig, final int port) throws SQLException {
ProxyConfiguration proxyConfig = new YamlProxyConfigurationSwapper().swap(yamlConfig);
boolean isOverwrite = null == modeConfig || modeConfig.isOverwrite();
Map<String, Map<String, DataSource>> dataSourcesMap = getDataSourcesMap(proxyConfig.getSchemaDataSources());
ContextManagerBuilderParameter parameter = ContextManagerBuilderParameter.builder().modeConfig(modeConfig).dataSourcesMap(dataSourcesMap).schemaRuleConfigs(proxyConfig.getSchemaRules())
.globalRuleConfigs(proxyConfig.getGlobalRules()).props(proxyConfig.getProps()).isOverwrite(isOverwrite).labels(proxyConfig.getLabels())
.instanceDefinition(new InstanceDefinition(InstanceType.PROXY, port)).build();
- ContextManager contextManager = ContextManagerBuilderFactory.newInstance(modeConfig).build(parameter);
+ return ContextManagerBuilderFactory.newInstance(modeConfig).build(parameter);
+ }
+
+ private void initContext(final ContextManager contextManager) {
ProxyContext.getInstance().init(contextManager);
}
@@ -93,7 +97,7 @@ public final class BootstrapInitializer {
return result;
}
- private void initRuleAlteredJobWorker(final ModeConfiguration modeConfig) {
+ private void initRuleAlteredJobWorker(final ModeConfiguration modeConfig, final ContextManager contextManager) {
if (null == modeConfig) {
return;
}
@@ -103,6 +107,7 @@ public final class BootstrapInitializer {
return;
}
RuleAlteredContext.initModeConfig(modeConfig);
+ RuleAlteredContext.initContextManager(contextManager);
// TODO init worker only if necessary, e.g. 1) rule altered action configured, 2) enabled job exists, 3) stopped job restarted
RuleAlteredJobWorker.initWorkerIfNecessary();
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
index 66c661c..fd74b07 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
@@ -141,6 +141,7 @@ public final class PipelineJobAPIImplTest {
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
initTableData(jobConfig.getPipelineConfig());
+ RuleAlteredContextUtil.mockContextManager();
Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get());
assertThat(checkResultMap.size(), is(1));
}
@@ -151,6 +152,7 @@ public final class PipelineJobAPIImplTest {
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
initTableData(jobConfig.getPipelineConfig());
+ RuleAlteredContextUtil.mockContextManager();
Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get(), FixtureDataConsistencyCheckAlgorithm.TYPE);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index 6b284f9..badd4ee 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -19,12 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
+import org.junit.BeforeClass;
import org.junit.Test;
import javax.sql.DataSource;
@@ -43,6 +45,11 @@ import static org.junit.Assert.assertTrue;
public final class DataConsistencyCheckerImplTest {
+ @BeforeClass
+ public static void beforeClass() {
+ RuleAlteredContextUtil.mockContextManager();
+ }
+
@Test
public void assertCountAndDataCheck() {
RuleAlteredJobContext jobContext = new RuleAlteredJobContext(ResourceUtil.mockJobConfig());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
index d4379c6..078feea 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
@@ -76,9 +76,18 @@ public final class ResourceUtil {
return result;
}
+ /**
+ * Read file to string.
+ *
+ * @param fileName file name
+ * @return file content
+ */
@SneakyThrows(IOException.class)
- private static String readFileToString(final String fileName) {
+ public static String readFileToString(final String fileName) {
try (InputStream in = ResourceUtil.class.getResourceAsStream(fileName)) {
+ if (null == in) {
+ throw new NullPointerException("get " + fileName + " as stream return null");
+ }
return IOUtils.toString(in, StandardCharsets.UTF_8);
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
index ad6d656..0fc684c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
@@ -18,14 +18,20 @@
package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import java.util.Properties;
+@Slf4j
public final class RuleAlteredContextUtil {
private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance();
@@ -43,6 +49,24 @@ public final class RuleAlteredContextUtil {
}
/**
+ * Mock context manager.
+ */
+ public static void mockContextManager() {
+ ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig = new ShardingSpherePipelineDataSourceConfiguration(
+ ResourceUtil.readFileToString("/config_sharding_sphere_jdbc_source.yaml"));
+ ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) new PipelineDataSourceFactory().newInstance(pipelineDataSourceConfig).getDataSource();
+ ContextManager contextManager = shardingSphereDataSource.getContextManager();
+ RuleAlteredContext.initContextManager(contextManager);
+ try {
+ shardingSphereDataSource.close();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("close data source failed", ex);
+ }
+ }
+
+ /**
* Get execute engine.
*
* @return execute engine