You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/14 04:27:48 UTC
[shardingsphere] branch master updated: Improve drop pipeline process configuration and IT (#20962)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 14741189b15 Improve drop pipeline process configuration and IT (#20962)
14741189b15 is described below
commit 14741189b15a6ea6cf522eb10aeb0f5a53e6f277
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Sep 14 12:27:36 2022 +0800
Improve drop pipeline process configuration and IT (#20962)
* Improve drop pipeline process and IT
* Fix codestyle
* Fix codestyle
---
.../data/pipeline/YamlPipelineProcessConfiguration.java | 9 +++++++++
.../data/pipeline/api/PipelineJobPublicAPIFactory.java | 1 +
.../impl/PipelineProcessConfigurationPersistService.java | 6 +++++-
.../integration/data/pipeline/cases/base/BaseITCase.java | 2 +-
.../pipeline/cases/migration/AbstractMigrationITCase.java | 13 ++++++-------
.../cases/migration/general/MySQLMigrationGeneralIT.java | 3 +++
6 files changed, 25 insertions(+), 9 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
index e2bbc8d2778..1cb6cfe7680 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -91,4 +91,13 @@ public final class YamlPipelineProcessConfiguration implements YamlConfiguration
break;
}
}
+
+ /**
+ * Check all fields is null.
+ *
+ * @return true if all fields is null, otherwise is false.
+ */
+ public boolean isAllFieldsNull() {
+ return null == read && null == write && null == streamChannel;
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 97e6d18727d..958d6943872 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -29,6 +29,7 @@ public final class PipelineJobPublicAPIFactory {
static {
ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
+ ShardingSphereServiceLoader.register(PipelineJobPublicAPI.class);
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
index e57e08c89dd..c6c5079b5c1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -39,7 +39,11 @@ public final class PipelineProcessConfigurationPersistService implements Pipelin
if (StringUtils.isBlank(yamlText)) {
return null;
}
- return PROCESS_CONFIG_SWAPPER.swapToObject(YamlEngine.unmarshal(yamlText, YamlPipelineProcessConfiguration.class, true));
+ YamlPipelineProcessConfiguration yamlConfig = YamlEngine.unmarshal(yamlText, YamlPipelineProcessConfiguration.class, true);
+ if (null == yamlConfig || yamlConfig.isAllFieldsNull()) {
+ return null;
+ }
+ return PROCESS_CONFIG_SWAPPER.swapToObject(yamlConfig);
}
@Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index eee2f9b9698..4deb6fdffcf 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -159,7 +159,7 @@ public abstract class BaseITCase {
} catch (final SQLException ex) {
throw new IllegalStateException(ex);
}
- sourceDataSource = StorageContainerUtil.generateDataSource(getActualJdbcUrlTemplate(DS_0, false), username, password);
+ sourceDataSource = StorageContainerUtil.generateDataSource(appendBatchInsertParam(getActualJdbcUrlTemplate(DS_0, false)), username, password);
proxyDataSource = StorageContainerUtil.generateDataSource(containerComposer.getProxyJdbcUrl(PROXY_DATABASE), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index fcdd0f9fd72..761ebb83f29 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -126,15 +126,14 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
}
protected void addMigrationProcessConfig() throws SQLException {
- try {
- proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
- } catch (final SQLException ex) {
- if ("58000".equals(ex.getSQLState()) || "HY000".equals(ex.getSQLState())) {
- log.warn(ex.getMessage());
- return;
+ if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ try {
+ proxyExecuteWithLog("DROP MIGRATION PROCESS CONFIGURATION '/'", 0);
+ } catch (final SQLException ex) {
+ log.warn("Drop migration process configuration failed, maybe it's not exist. error msg={}", ex.getMessage());
}
- throw ex;
}
+ proxyExecuteWithLog(migrationDistSQLCommand.getAddMigrationProcessConfig(), 0);
}
protected void stopMigrationByJobId(final String jobId) throws SQLException {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 7c3889a7827..92f5aa3a319 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.SQLException;
+import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -82,8 +83,10 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
KeyGenerateAlgorithm keyGenerateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), 3000);
+ log.info("init data begin: {}", LocalDateTime.now());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+ log.info("init data end: {}", LocalDateTime.now());
startMigrationOrderCopy(false);
startMigrationOrderItem(false);
startIncrementTask(new MySQLIncrementTask(jdbcTemplate, keyGenerateAlgorithm, 20));