You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/14 04:27:48 UTC

[shardingsphere] branch master updated: Improve drop pipeline process configuration and IT (#20962)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 14741189b15 Improve drop pipeline process configuration and IT (#20962)
14741189b15 is described below

commit 14741189b15a6ea6cf522eb10aeb0f5a53e6f277
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Sep 14 12:27:36 2022 +0800

    Improve drop pipeline process configuration and IT (#20962)
    
    * Improve drop pipeline process and IT
    
    * Fix codestyle
    
    * Fix codestyle
---
 .../data/pipeline/YamlPipelineProcessConfiguration.java     |  9 +++++++++
 .../data/pipeline/api/PipelineJobPublicAPIFactory.java      |  1 +
 .../impl/PipelineProcessConfigurationPersistService.java    |  6 +++++-
 .../integration/data/pipeline/cases/base/BaseITCase.java    |  2 +-
 .../pipeline/cases/migration/AbstractMigrationITCase.java   | 13 ++++++-------
 .../cases/migration/general/MySQLMigrationGeneralIT.java    |  3 +++
 6 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
index e2bbc8d2778..1cb6cfe7680 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -91,4 +91,13 @@ public final class YamlPipelineProcessConfiguration implements YamlConfiguration
                 break;
         }
     }
+    
+    /**
+     * Check all fields is null.
+     *
+     * @return true if all fields is null, otherwise is false.
+     */
+    public boolean isAllFieldsNull() {
+        return null == read && null == write && null == streamChannel;
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 97e6d18727d..958d6943872 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -29,6 +29,7 @@ public final class PipelineJobPublicAPIFactory {
     
     static {
         ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
+        ShardingSphereServiceLoader.register(PipelineJobPublicAPI.class);
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
index e57e08c89dd..c6c5079b5c1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -39,7 +39,11 @@ public final class PipelineProcessConfigurationPersistService implements Pipelin
         if (StringUtils.isBlank(yamlText)) {
             return null;
         }
-        return PROCESS_CONFIG_SWAPPER.swapToObject(YamlEngine.unmarshal(yamlText, YamlPipelineProcessConfiguration.class, true));
+        YamlPipelineProcessConfiguration yamlConfig = YamlEngine.unmarshal(yamlText, YamlPipelineProcessConfiguration.class, true);
+        if (null == yamlConfig || yamlConfig.isAllFieldsNull()) {
+            return null;
+        }
+        return PROCESS_CONFIG_SWAPPER.swapToObject(yamlConfig);
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index eee2f9b9698..4deb6fdffcf 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -159,7 +159,7 @@ public abstract class BaseITCase {
         } catch (final SQLException ex) {
             throw new IllegalStateException(ex);
         }
-        sourceDataSource = StorageContainerUtil.generateDataSource(getActualJdbcUrlTemplate(DS_0, false), username, password);
+        sourceDataSource = StorageContainerUtil.generateDataSource(appendBatchInsertParam(getActualJdbcUrlTemplate(DS_0, false)), username, password);
         proxyDataSource = StorageContainerUtil.generateDataSource(containerComposer.getProxyJdbcUrl(PROXY_DATABASE), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
     }
     
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index fcdd0f9fd72..761ebb83f29 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -126,15 +126,14 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
     }
     
     protected void addMigrationProcessConfig() throws SQLException {
-        try {
-            proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
-        } catch (final SQLException ex) {
-            if ("58000".equals(ex.getSQLState()) || "HY000".equals(ex.getSQLState())) {
-                log.warn(ex.getMessage());
-                return;
+        if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+            try {
+                proxyExecuteWithLog("DROP MIGRATION PROCESS CONFIGURATION '/'", 0);
+            } catch (final SQLException ex) {
+                log.warn("Drop migration process configuration failed, maybe it's not exist. error msg={}", ex.getMessage());
             }
-            throw ex;
         }
+        proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
     }
     
     protected void stopMigrationByJobId(final String jobId) throws SQLException {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 7c3889a7827..92f5aa3a319 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.sql.SQLException;
+import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -82,8 +83,10 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
         KeyGenerateAlgorithm keyGenerateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
         JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
         Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), 3000);
+        log.info("init data begin: {}", LocalDateTime.now());
         jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
         jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+        log.info("init data end: {}", LocalDateTime.now());
         startMigrationOrderCopy(false);
         startMigrationOrderItem(false);
         startIncrementTask(new MySQLIncrementTask(jdbcTemplate, keyGenerateAlgorithm, 20));