You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2022/01/06 11:57:52 UTC

[shardingsphere] branch master updated: Get table metadata from context on scaling data consistency check (#14574)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0872546  Get table metadata from context on scaling data consistency check (#14574)
0872546 is described below

commit 0872546f7c49c47ee3933191417143179c0f074e
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 6 19:57:05 2022 +0800

    Get table metadata from context on scaling data consistency check (#14574)
    
    * Get table metadata from context
    
    * Unit test
---
 .../consistency/DataConsistencyCheckerImpl.java    | 41 ++++++++--------------
 .../scenario/rulealtered/RuleAlteredContext.java   | 21 +++++++++++
 .../api/datasource/PipelineDataSourceWrapper.java  |  1 +
 .../proxy/initializer/BootstrapInitializer.java    | 15 +++++---
 .../pipeline/api/impl/PipelineJobAPIImplTest.java  |  2 ++
 .../DataConsistencyCheckerImplTest.java            |  9 ++++-
 .../data/pipeline/core/util/ResourceUtil.java      | 11 +++++-
 .../pipeline/core/util/RuleAlteredContextUtil.java | 24 +++++++++++++
 8 files changed, 91 insertions(+), 33 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index e73ad3b..7f0d8cc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
+import com.google.common.base.Preconditions;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -27,14 +28,16 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import javax.sql.DataSource;
@@ -49,7 +52,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
@@ -66,8 +68,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class DataConsistencyCheckerImpl implements DataConsistencyChecker {
     
-    private static final Map<PipelineDataSourceConfiguration, PipelineTableMetaDataLoader> TABLE_META_DATA_LOADER_MAP = new ConcurrentHashMap<>();
-    
     private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
     
     // TODO replace to JobConfiguration
@@ -146,17 +146,17 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
         JobRateLimitAlgorithm rateLimitAlgorithm = jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
         try (PipelineDataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceDataSourceConfig);
              PipelineDataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetDataSourceConfig)) {
-            PipelineTableMetaDataLoader tableMetaDataLoader = getTableMetaDataLoader(sourceDataSourceConfig, sourceDataSource);
+            Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobContext.getJobConfig().getWorkflowConfig().getSchemaName());
             logicTableNames.forEach(each -> {
                 //TODO put to preparer
-                if (null == tableMetaDataLoader.getTableMetaData(each)) {
+                if (!tableMetaDataMap.containsKey(each)) {
                     throw new PipelineDataConsistencyCheckFailedException(String.format("could not get metadata for table '%s'", each));
                 }
             });
             for (String each : logicTableNames) {
-                PipelineTableMetaData tableMetaData = tableMetaDataLoader.getTableMetaData(each);
-                Collection<String> columnNames = tableMetaData.getColumnNames();
-                String uniqueKey = tableMetaData.getPrimaryKeys().get(0);
+                TableMetaData tableMetaData = tableMetaDataMap.get(each);
+                Collection<String> columnNames = tableMetaData.getColumns().keySet();
+                String uniqueKey = tableMetaData.getPrimaryKeyColumns().get(0);
                 DataCalculateParameter sourceCalculateParameter = DataCalculateParameter.builder().dataSource(sourceDataSource).databaseType(sourceDatabaseType).peerDatabaseType(targetDatabaseType)
                     .logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build();
                 DataCalculateParameter targetCalculateParameter = DataCalculateParameter.builder().dataSource(targetDataSource).databaseType(targetDatabaseType).peerDatabaseType(sourceDatabaseType)
@@ -194,21 +194,10 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
         }
     }
     
-    private PipelineTableMetaDataLoader getTableMetaDataLoader(final PipelineDataSourceConfiguration sourceDataSourceConfig, final PipelineDataSourceWrapper sourceDataSource) throws SQLException {
-        PipelineTableMetaDataLoader result = TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig);
-        if (null != result) {
-            return result;
-        }
-        synchronized (TABLE_META_DATA_LOADER_MAP) {
-            result = TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig);
-            if (null != result) {
-                return result;
-            }
-            try (Connection connection = sourceDataSource.getConnection()) {
-                result = new PipelineTableMetaDataLoader(connection, "%");
-                TABLE_META_DATA_LOADER_MAP.put(sourceDataSourceConfig, result);
-            }
-            return result;
-        }
+    private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
+        ContextManager contextManager = RuleAlteredContext.getContextManager();
+        Preconditions.checkNotNull(contextManager, "contextManager null");
+        ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(schemaName);
+        return metaData.getSchema().getTables();
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 4e4bee3..c939b2b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
 /**
@@ -50,6 +51,8 @@ public final class RuleAlteredContext {
     
     private static volatile ModeConfiguration modeConfig;
     
+    private static volatile ContextManager contextManager;
+    
     private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
     
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
@@ -112,4 +115,22 @@ public final class RuleAlteredContext {
     public static void initModeConfig(final ModeConfiguration modeConfig) {
         RuleAlteredContext.modeConfig = modeConfig;
     }
+    
+    /**
+     * Get context manager.
+     *
+     * @return context manager
+     */
+    public static ContextManager getContextManager() {
+        return contextManager;
+    }
+    
+    /**
+     * Initialize context manager.
+     *
+     * @param contextManager context manager
+     */
+    public static void initContextManager(final ContextManager contextManager) {
+        RuleAlteredContext.contextManager = contextManager;
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java
index 205c023..b21eb45 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceWrapper.java
@@ -36,6 +36,7 @@ import java.util.logging.Logger;
 @Slf4j
 public final class PipelineDataSourceWrapper implements DataSource, AutoCloseable {
     
+    @Getter
     private final DataSource dataSource;
     
     @Getter
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index 7eb6bc9..8a009c2 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -68,19 +68,23 @@ public final class BootstrapInitializer {
     public void init(final YamlProxyConfiguration yamlConfig, final int port) throws SQLException {
         ModeConfiguration modeConfig = null == yamlConfig.getServerConfiguration().getMode()
                 ? null : new ModeConfigurationYamlSwapper().swapToObject(yamlConfig.getServerConfiguration().getMode());
-        initContext(yamlConfig, modeConfig, port);
-        initRuleAlteredJobWorker(modeConfig);
+        ContextManager contextManager = createContextManager(yamlConfig, modeConfig, port);
+        initContext(contextManager);
+        initRuleAlteredJobWorker(modeConfig, contextManager);
         setDatabaseServerInfo();
     }
     
-    private void initContext(final YamlProxyConfiguration yamlConfig, final ModeConfiguration modeConfig, final int port) throws SQLException {
+    private ContextManager createContextManager(final YamlProxyConfiguration yamlConfig, final ModeConfiguration modeConfig, final int port) throws SQLException {
         ProxyConfiguration proxyConfig = new YamlProxyConfigurationSwapper().swap(yamlConfig);
         boolean isOverwrite = null == modeConfig || modeConfig.isOverwrite();
         Map<String, Map<String, DataSource>> dataSourcesMap = getDataSourcesMap(proxyConfig.getSchemaDataSources());
         ContextManagerBuilderParameter parameter = ContextManagerBuilderParameter.builder().modeConfig(modeConfig).dataSourcesMap(dataSourcesMap).schemaRuleConfigs(proxyConfig.getSchemaRules())
                 .globalRuleConfigs(proxyConfig.getGlobalRules()).props(proxyConfig.getProps()).isOverwrite(isOverwrite).labels(proxyConfig.getLabels())
                 .instanceDefinition(new InstanceDefinition(InstanceType.PROXY, port)).build();
-        ContextManager contextManager = ContextManagerBuilderFactory.newInstance(modeConfig).build(parameter);
+        return ContextManagerBuilderFactory.newInstance(modeConfig).build(parameter);
+    }
+    
+    private void initContext(final ContextManager contextManager) {
         ProxyContext.getInstance().init(contextManager);
     }
     
@@ -93,7 +97,7 @@ public final class BootstrapInitializer {
         return result;
     }
     
-    private void initRuleAlteredJobWorker(final ModeConfiguration modeConfig) {
+    private void initRuleAlteredJobWorker(final ModeConfiguration modeConfig, final ContextManager contextManager) {
         if (null == modeConfig) {
             return;
         }
@@ -103,6 +107,7 @@ public final class BootstrapInitializer {
             return;
         }
         RuleAlteredContext.initModeConfig(modeConfig);
+        RuleAlteredContext.initContextManager(contextManager);
         // TODO init worker only if necessary, e.g. 1) rule altered action configured, 2) enabled job exists, 3) stopped job restarted
         RuleAlteredJobWorker.initWorkerIfNecessary();
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
index 66c661c..fd74b07 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
@@ -141,6 +141,7 @@ public final class PipelineJobAPIImplTest {
         assertTrue(jobId.isPresent());
         JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
         initTableData(jobConfig.getPipelineConfig());
+        RuleAlteredContextUtil.mockContextManager();
         Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get());
         assertThat(checkResultMap.size(), is(1));
     }
@@ -151,6 +152,7 @@ public final class PipelineJobAPIImplTest {
         assertTrue(jobId.isPresent());
         JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
         initTableData(jobConfig.getPipelineConfig());
+        RuleAlteredContextUtil.mockContextManager();
         Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get(), FixtureDataConsistencyCheckAlgorithm.TYPE);
         assertThat(checkResultMap.size(), is(1));
         assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index 6b284f9..badd4ee 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -19,12 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.sql.DataSource;
@@ -43,6 +45,11 @@ import static org.junit.Assert.assertTrue;
 
 public final class DataConsistencyCheckerImplTest {
     
+    @BeforeClass
+    public static void beforeClass() {
+        RuleAlteredContextUtil.mockContextManager();
+    }
+    
     @Test
     public void assertCountAndDataCheck() {
         RuleAlteredJobContext jobContext = new RuleAlteredJobContext(ResourceUtil.mockJobConfig());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
index d4379c6..078feea 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ResourceUtil.java
@@ -76,9 +76,18 @@ public final class ResourceUtil {
         return result;
     }
     
+    /**
+     * Read file to string.
+     *
+     * @param fileName file name
+     * @return file content
+     */
     @SneakyThrows(IOException.class)
-    private static String readFileToString(final String fileName) {
+    public static String readFileToString(final String fileName) {
         try (InputStream in = ResourceUtil.class.getResourceAsStream(fileName)) {
+            if (null == in) {
+                throw new NullPointerException("get " + fileName + " as stream return null");
+            }
             return IOUtils.toString(in, StandardCharsets.UTF_8);
         }
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
index ad6d656..0fc684c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
@@ -18,14 +18,20 @@
 package org.apache.shardingsphere.data.pipeline.core.util;
 
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 
 import java.util.Properties;
 
+@Slf4j
 public final class RuleAlteredContextUtil {
     
     private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance();
@@ -43,6 +49,24 @@ public final class RuleAlteredContextUtil {
     }
     
     /**
+     * Mock context manager.
+     */
+    public static void mockContextManager() {
+        ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig = new ShardingSpherePipelineDataSourceConfiguration(
+                ResourceUtil.readFileToString("/config_sharding_sphere_jdbc_source.yaml"));
+        ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) new PipelineDataSourceFactory().newInstance(pipelineDataSourceConfig).getDataSource();
+        ContextManager contextManager = shardingSphereDataSource.getContextManager();
+        RuleAlteredContext.initContextManager(contextManager);
+        try {
+            shardingSphereDataSource.close();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("close data source failed", ex);
+        }
+    }
+    
+    /**
      * Get execute engine.
      *
      * @return execute engine