You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/27 11:38:21 UTC
[shardingsphere] branch master updated: Add stop/start migration check DistSQL and implement (#21222)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e8e55f76470 Add stop/start migration check DistSQL and implement (#21222)
e8e55f76470 is described below
commit e8e55f76470cde4fa21c8c79e88f32a21dcded5b
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Sep 27 19:38:08 2022 +0800
Add stop/start migration check DistSQL and implement (#21222)
---
.../handler/update/StartMigrationCheckUpdater.java | 41 ++++++++++++++++++
.../handler/update/StopMigrationCheckUpdater.java | 41 ++++++++++++++++++
....shardingsphere.infra.distsql.update.RALUpdater | 2 +
.../main/antlr4/imports/migration/RALStatement.g4 | 8 ++++
.../parser/autogen/MigrationDistSQLStatement.g4 | 2 +
.../core/MigrationDistSQLStatementVisitor.java | 16 ++++++-
.../statement/StartMigrationCheckStatement.java} | 31 ++++++--------
.../statement/StopMigrationCheckStatement.java} | 31 ++++++--------
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 14 ++++++
.../job/ConsistencyCheckJobConfiguration.java | 2 +-
...amlConsistencyCheckJobConfigurationSwapper.java | 2 +-
.../data/pipeline/api/job/JobStatus.java | 5 +++
.../pipeline/core/api/GovernanceRepositoryAPI.java | 3 +-
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 5 ++-
.../PipelineJobHasAlreadyFinishedException.java | 32 ++++++--------
.../consistencycheck/ConsistencyCheckJob.java | 36 ++++++++++------
.../ConsistencyCheckJobAPIImpl.java | 50 ++++++++++++++++++----
.../scenario/migration/MigrationJobAPIImpl.java | 10 +++++
.../atomic/storage/impl/OpenGaussContainer.java | 2 +-
.../data/pipeline/cases/base/BaseITCase.java | 3 +-
.../api/impl/ConsistencyCheckJobAPIImplTest.java | 38 ++++++++++++++--
21 files changed, 287 insertions(+), 87 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationCheckUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationCheckUpdater.java
new file mode 100644
index 00000000000..365d5256b37
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationCheckUpdater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.migration.distsql.handler.update;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheckStatement;
+
+/**
+ * Start migration check updater.
+ */
+public final class StartMigrationCheckUpdater implements RALUpdater<StartMigrationCheckStatement> {
+
+ private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
+
+ @Override
+ public void executeUpdate(final String databaseName, final StartMigrationCheckStatement sqlStatement) {
+ JOB_API.startByParentJobId(sqlStatement.getJobId());
+ }
+
+ @Override
+ public String getType() {
+ return StartMigrationCheckStatement.class.getName();
+ }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java
new file mode 100644
index 00000000000..0793ac26329
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.migration.distsql.handler.update;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement;
+
+/**
+ * Stop migration check updater.
+ */
+public final class StopMigrationCheckUpdater implements RALUpdater<StopMigrationCheckStatement> {
+
+ private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
+
+ @Override
+ public void executeUpdate(final String databaseName, final StopMigrationCheckStatement sqlStatement) {
+ JOB_API.stopByParentJobId(sqlStatement.getJobId());
+ }
+
+ @Override
+ public String getType() {
+ return StopMigrationCheckStatement.class.getName();
+ }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
index 0c026e4a415..d82c8b27b18 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
@@ -23,3 +23,5 @@ org.apache.shardingsphere.migration.distsql.handler.update.RollbackMigrationUpda
org.apache.shardingsphere.migration.distsql.handler.update.AddMigrationSourceResourceUpdater
org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationSourceResourceUpdater
org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpdater
+org.apache.shardingsphere.migration.distsql.handler.update.StartMigrationCheckUpdater
+org.apache.shardingsphere.migration.distsql.handler.update.StopMigrationCheckUpdater
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
index e595c00fcc4..bd5925cc889 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
@@ -55,6 +55,14 @@ showMigrationCheckStatus
: SHOW MIGRATION CHECK STATUS jobId
;
+stopMigrationCheck
+ : STOP MIGRATION CHECK jobId
+ ;
+
+startMigrationCheck
+ : START MIGRATION CHECK jobId
+ ;
+
showMigrationCheckAlgorithms
: SHOW MIGRATION CHECK ALGORITHMS
;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
index 40237cd1699..e203d02f9b7 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
@@ -33,5 +33,7 @@ execute
| dropMigrationSourceResource
| showMigrationSourceResources
| showMigrationCheckStatus
+ | startMigrationCheck
+ | stopMigrationCheck
) SEMI?
;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index b7c41e605e1..f76d5d83e58 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -37,7 +37,9 @@ import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatemen
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceResourcesContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
+import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
+import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
@@ -49,12 +51,14 @@ import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStat
import org.apache.shardingsphere.migration.distsql.statement.DropMigrationSourceResourceStatement;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
-import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceResourcesStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
+import org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheckStatement;
import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement;
import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
@@ -197,4 +201,14 @@ public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStat
public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
return new ShowMigrationCheckStatusStatement(getIdentifierValue(ctx.jobId()));
}
+
+ @Override
+ public ASTNode visitStartMigrationCheck(final StartMigrationCheckContext ctx) {
+ return new StartMigrationCheckStatement(getIdentifierValue(ctx.jobId()));
+ }
+
+ @Override
+ public ASTNode visitStopMigrationCheck(final StopMigrationCheckContext ctx) {
+ return new StopMigrationCheckStatement(getIdentifierValue(ctx.jobId()));
+ }
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StartMigrationCheckStatement.java
similarity index 64%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StartMigrationCheckStatement.java
index 40237cd1699..527eacfc647 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StartMigrationCheckStatement.java
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.migration.distsql.statement;
-import Symbol, RALStatement, RQLStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
-execute
- : (showMigrationList
- | showMigrationStatus
- | migrateTable
- | startMigration
- | stopMigration
- | rollbackMigration
- | commitMigration
- | checkMigration
- | showMigrationCheckAlgorithms
- | addMigrationSourceResource
- | dropMigrationSourceResource
- | showMigrationSourceResources
- | showMigrationCheckStatus
- ) SEMI?
- ;
+/**
+ * Start migration check statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class StartMigrationCheckStatement extends UpdatableScalingRALStatement {
+
+ private final String jobId;
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StopMigrationCheckStatement.java
similarity index 64%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StopMigrationCheckStatement.java
index 40237cd1699..3f8b5bf3332 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StopMigrationCheckStatement.java
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.migration.distsql.statement;
-import Symbol, RALStatement, RQLStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
-execute
- : (showMigrationList
- | showMigrationStatus
- | migrateTable
- | startMigration
- | stopMigration
- | rollbackMigration
- | commitMigration
- | checkMigration
- | showMigrationCheckAlgorithms
- | addMigrationSourceResource
- | dropMigrationSourceResource
- | showMigrationSourceResources
- | showMigrationCheckStatus
- ) SEMI?
- ;
+/**
+ * Stop migration check statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class StopMigrationCheckStatement extends UpdatableScalingRALStatement {
+
+ private final String jobId;
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index cb6ed9ac55d..5e226ce1124 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -45,4 +45,18 @@ public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, Requ
* @return latest data consistency check result
*/
Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(String jobId);
+
+ /**
+ * Start by parent job id.
+ *
+ * @param parentJobId parent job id
+ */
+ void startByParentJobId(String parentJobId);
+
+ /**
+ * Start by parent job id.
+ *
+ * @param parentJobId parent job id
+ */
+ void stopByParentJobId(String parentJobId);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
index 3102c2bbb9d..8576a374cae 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
@@ -35,7 +35,7 @@ public final class ConsistencyCheckJobConfiguration implements PipelineJobConfig
private final String jobId;
- private final String referredJobId;
+ private final String parentJobId;
private final String algorithmTypeName;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
index e7e07cdb678..64ce12e1029 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -32,7 +32,7 @@ public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlCo
public YamlConsistencyCheckJobConfiguration swapToYamlConfiguration(final ConsistencyCheckJobConfiguration data) {
YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration();
result.setJobId(data.getJobId());
- result.setParentJobId(data.getReferredJobId());
+ result.setParentJobId(data.getParentJobId());
result.setAlgorithmTypeName(data.getAlgorithmTypeName());
result.setAlgorithmProperties(data.getAlgorithmProperties());
return result;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 3dd111050cd..50f7272f2fc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -58,6 +58,11 @@ public enum JobStatus {
*/
FINISHED(false),
+ /**
+ * Consistency check job execute failed.
+ */
+ CONSISTENCY_CHECK_FAILURE(false),
+
/**
* Task has stopped by failing to prepare work.
*/
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index bf47fc38033..edbc645a141 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* Governance repository API.
@@ -62,7 +63,7 @@ public interface GovernanceRepositoryAPI {
* @param jobId job id
* @return check job id
*/
- String getCheckLatestJobId(String jobId);
+ Optional<String> getCheckLatestJobId(String jobId);
/**
* Persist check latest result.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 3af215a37b2..3baf3db9f5f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -64,8 +65,8 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
}
@Override
- public String getCheckLatestJobId(final String jobId) {
- return repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId));
+ public Optional<String> getCheckLatestJobId(final String jobId) {
+ return Optional.ofNullable(repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId)));
}
@Override
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
similarity index 56%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index 40237cd1699..c0835346d6d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.data.pipeline.core.exception.job;
-import Symbol, RALStatement, RQLStatement;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
-execute
- : (showMigrationList
- | showMigrationStatus
- | migrateTable
- | startMigration
- | stopMigration
- | rollbackMigration
- | commitMigration
- | checkMigration
- | showMigrationCheckAlgorithms
- | addMigrationSourceResource
- | dropMigrationSourceResource
- | showMigrationSourceResources
- | showMigrationCheckStatus
- ) SEMI?
- ;
+/**
+ * Pipeline job has already finished exception.
+ */
+public final class PipelineJobHasAlreadyFinishedException extends PipelineSQLException {
+
+ private static final long serialVersionUID = 2854259384634892428L;
+
+ public PipelineJobHasAlreadyFinishedException(final String message) {
+ super(XOpenSQLState.GENERAL_ERROR, 88, message);
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 14cb1c04245..030e2e4ba0c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -34,7 +34,9 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDa
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
+import java.util.Collections;
import java.util.Map;
/**
@@ -52,24 +54,32 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
String checkJobId = shardingContext.getJobName();
setJobId(checkJobId);
ConsistencyCheckJobConfiguration consistencyCheckJobConfig = YamlConsistencyCheckJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
- ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, JobStatus.FINISHED);
+ JobStatus status = JobStatus.RUNNING;
+ ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, status);
jobAPI.persistJobItemProgress(jobItemContext);
- String referredJobId = consistencyCheckJobConfig.getReferredJobId();
- log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, referredJobId);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(referredJobId, checkJobId);
- JobType jobType = PipelineJobIdUtils.parseJobType(referredJobId);
+ String parentJobId = consistencyCheckJobConfig.getParentJobId();
+ log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, parentJobId);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(parentJobId, checkJobId);
+ JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
- Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
- if (StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
- dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId);
- } else {
- dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = Collections.emptyMap();
+ try {
+ if (StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
+ dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(parentJobId);
+ } else {
+ dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+ }
+ status = JobStatus.FINISHED;
+ } catch (final SQLWrapperException ex) {
+ log.error("data consistency check failed", ex);
+ status = JobStatus.CONSISTENCY_CHECK_FAILURE;
+ jobAPI.persistJobItemErrorMessage(checkJobId, 0, ex);
}
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(referredJobId, checkJobId, dataConsistencyCheckResult);
- jobItemContext.setStatus(JobStatus.FINISHED);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+ jobItemContext.setStatus(status);
jobAPI.persistJobItemProgress(jobItemContext);
jobAPI.stop(checkJobId);
- log.info("execute consistency check job finished, job id:{}, referred job id:{}", checkJobId, referredJobId);
+ log.info("execute consistency check job finished, job id:{}, parent job id:{}", checkJobId, parentJobId);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 1512158497c..207b0954760 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -40,6 +40,8 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -47,6 +49,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
/**
* Consistency check job API impl.
@@ -65,15 +68,15 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public String createJobAndStart(final CreateConsistencyCheckJobParameter parameter) {
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- String checkLatestJobId = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
- if (StringUtils.isNotBlank(checkLatestJobId)) {
- PipelineJobItemProgress progress = getJobItemProgress(checkLatestJobId, 0);
+ Optional<String> optional = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
+ if (optional.isPresent()) {
+ PipelineJobItemProgress progress = getJobItemProgress(optional.get(), 0);
if (null != progress && JobStatus.FINISHED != progress.getStatus()) {
log.info("check job already existed and status isn't FINISHED, status {}", progress.getStatus());
- throw new PipelineJobHasAlreadyExistedException(checkLatestJobId);
+ throw new PipelineJobHasAlreadyExistedException(optional.get());
}
}
- int consistencyCheckVersionNew = null == checkLatestJobId ? 0 : ConsistencyCheckJobId.getSequence(checkLatestJobId) + 1;
+ int consistencyCheckVersionNew = optional.map(s -> ConsistencyCheckJobId.getSequence(s) + 1).orElse(0);
YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
ConsistencyCheckJobId checkJobId = new ConsistencyCheckJobId(parameter.getJobId(), consistencyCheckVersionNew);
String result = marshalJobId(checkJobId);
@@ -88,11 +91,11 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(final String jobId) {
- String checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
- if (StringUtils.isBlank(checkLatestJobId)) {
+ Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
+ if (!checkLatestJobId.isPresent()) {
return Collections.emptyMap();
}
- return PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(jobId, checkLatestJobId);
+ return PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(jobId, checkLatestJobId.get());
}
@Override
@@ -126,6 +129,37 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress)));
}
+ @Override
+ public void startDisabledJob(final String jobId) {
+ log.info("start disable check job {}", jobId);
+ PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
+ if (null != jobProgress && JobStatus.FINISHED == jobProgress.getStatus()) {
+ String errorMessage = String.format("job already finished, can use `CHECK MIGRATION '%s'` to start a new data consistency check job", jobId);
+ throw new PipelineJobHasAlreadyFinishedException(errorMessage);
+ }
+ super.startDisabledJob(jobId);
+ }
+
+ @Override
+ public void startByParentJobId(final String parentJobId) {
+ log.info("start check job by parentJobId {}", parentJobId);
+ Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ if (!optional.isPresent()) {
+ throw new PipelineJobNotFoundException(parentJobId + " check job");
+ }
+ startDisabledJob(optional.get());
+ }
+
+ @Override
+ public void stopByParentJobId(final String parentJobId) {
+ log.info("stop check job by parentJobId {}", parentJobId);
+ Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ if (!optional.isPresent()) {
+ throw new PipelineJobNotFoundException(parentJobId + " check job");
+ }
+ stop(optional.get());
+ }
+
@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 24aac28d387..9d3d1f91e2e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -53,6 +53,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineCol
import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
@@ -64,6 +65,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSche
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
@@ -97,6 +99,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -406,6 +409,13 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
return result;
}
+ @Override
+ public void startDisabledJob(final String jobId) {
+ super.startDisabledJob(jobId);
+ Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
+ optional.ifPresent(s -> ConsistencyCheckJobAPIFactory.getInstance().startDisabledJob(s));
+ }
+
@Override
public JobType getJobType() {
return JobType.MIGRATION;
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java
index 743ed014c9e..7d7b448921f 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java
@@ -47,8 +47,8 @@ public final class OpenGaussContainer extends DockerStorageContainer {
addEnvs(storageContainerConfiguration.getContainerEnvironments());
mapResources(storageContainerConfiguration.getMountedResources());
withPrivilegedMode(true);
- withStartupTimeout(Duration.of(120, ChronoUnit.SECONDS));
super.configure();
+ withStartupTimeout(Duration.of(120, ChronoUnit.SECONDS));
}
@Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index dae115c71c3..96c6ef808ea 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -282,7 +282,8 @@ public abstract class BaseITCase {
for (Map<String, Object> each : listJobStatus) {
assertTrue(StringUtils.isBlank(each.get("error_message").toString()));
actualStatus.add(each.get("status").toString());
- incrementalIdleSecondsList.add(Integer.parseInt(each.get("incremental_idle_seconds").toString()));
+ String incrementalIdleSeconds = each.get("incremental_idle_seconds").toString();
+ incrementalIdleSecondsList.add(StringUtils.isBlank(incrementalIdleSeconds) ? 0 : Integer.parseInt(incrementalIdleSeconds));
}
assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
index 65907b3c29a..5545cc94dfe 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -17,39 +17,69 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public final class ConsistencyCheckJobAPIImplTest {
- private static ConsistencyCheckJobAPI jobAPI;
+ private static ConsistencyCheckJobAPI checkJobAPI;
+
+ private static MigrationJobAPI migrationJobAPI;
@BeforeClass
public static void beforeClass() {
PipelineContextUtil.mockModeConfigAndContextManager();
- jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+ checkJobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+ migrationJobAPI = MigrationJobAPIFactory.getInstance();
}
@Test
public void assertCreateJobConfig() {
String migrationJobId = "j0101test";
CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(migrationJobId, null, null);
- String checkJobId = jobAPI.createJobAndStart(parameter);
- ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) jobAPI.getJobConfiguration(checkJobId);
+ String checkJobId = checkJobAPI.createJobAndStart(parameter);
+ ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
String expectCheckJobId = "j0201j0101test0";
assertThat(jobConfig.getJobId(), is(expectCheckJobId));
assertNull(jobConfig.getAlgorithmTypeName());
int consistencyCheckVersion = ConsistencyCheckJobId.getSequence(expectCheckJobId);
assertThat(consistencyCheckVersion, is(0));
}
+
+ @Test
+ public void assertGetLatestDataConsistencyCheckResult() {
+ Optional<String> jobId = migrationJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ assertTrue(jobId.isPresent());
+ CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(jobId.get(), null, null);
+ String checkJobId = checkJobAPI.createJobAndStart(parameter);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(jobId.get(), checkJobId);
+ Map<String, DataConsistencyCheckResult> expectResult = Collections.singletonMap("t_order", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1),
+ new DataConsistencyContentCheckResult(true)));
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(), checkJobId, expectResult);
+ Map<String, DataConsistencyCheckResult> actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(jobId.get());
+ assertThat(actualCheckResult.size(), is(expectResult.size()));
+ assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectResult.get("t_order").getContentCheckResult().isMatched()));
+ }
}