You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/09/21 09:14:12 UTC
[shardingsphere] branch master updated: Extract InventoryIncrementalJobPublicAPI for common usage (#21111)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa653d5da23 Extract InventoryIncrementalJobPublicAPI for common usage (#21111)
fa653d5da23 is described below
commit fa653d5da237768c2972da0c4078c2524bc0f27e
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Sep 21 17:13:53 2022 +0800
Extract InventoryIncrementalJobPublicAPI for common usage (#21111)
* Move commit and rollback from PipelineJobPublicAPI to MigrationJobPublicAPI
* Add InventoryIncrementalJobPublicAPI, move PipelineJobPublicAPI processConfig apis
* Move commit and rollback to InventoryIncrementalJobPublicAPI
* Throw SQLException for commit
* Fix compile error
---
.../handler/update/CommitMigrationUpdater.java | 8 ++-
....java => InventoryIncrementalJobPublicAPI.java} | 30 ++-------
.../data/pipeline/api/MigrationJobPublicAPI.java | 2 +-
.../data/pipeline/api/PipelineJobPublicAPI.java | 48 ++------------
.../pipeline/api/PipelineJobPublicAPIFactory.java | 8 +--
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 71 +--------------------
.../impl/InventoryIncrementalJobPublicAPIImpl.java | 74 ++++++++++++++++++++++
.../scenario/migration/MigrationJobAPIImpl.java | 23 +++++--
....pipeline.api.InventoryIncrementalJobPublicAPI} | 0
...toryIncrementalProcessConfigurationUpdater.java | 4 +-
...toryIncrementalProcessConfigurationUpdater.java | 4 +-
.../DropPipelineProcessConfigurationUpdater.java | 4 +-
.../api/PipelineJobPublicAPIFactoryTest.java | 8 +--
.../core/api/impl/MigrationJobAPIImplTest.java | 2 +-
14 files changed, 126 insertions(+), 160 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
index 7f1544e10fd..5d1370a629c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
+import java.sql.SQLException;
+
/**
* Commit migration updater.
*/
@@ -31,7 +33,11 @@ public final class CommitMigrationUpdater implements RALUpdater<CommitMigrationS
@Override
public void executeUpdate(final String databaseName, final CommitMigrationStatement sqlStatement) {
- JOB_API.commit(sqlStatement.getJobId());
+ try {
+ JOB_API.commit(sqlStatement.getJobId());
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
similarity index 78%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 2898ade8cc1..53b89e72f35 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -18,16 +18,14 @@
package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import java.sql.SQLException;
-import java.util.List;
/**
- * Pipeline job public API.
+ * Inventory incremental job API.
*/
-public interface PipelineJobPublicAPI extends TypedSPI {
+public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI, TypedSPI {
/**
* Create process configuration.
@@ -57,20 +55,6 @@ public interface PipelineJobPublicAPI extends TypedSPI {
*/
PipelineProcessConfiguration showProcessConfiguration();
- /**
- * Start disabled job.
- *
- * @param jobId job id
- */
- void startDisabledJob(String jobId);
-
- /**
- * Stop pipeline job.
- *
- * @param jobId job id
- */
- void stop(String jobId);
-
/**
* Rollback pipeline job.
*
@@ -83,13 +67,7 @@ public interface PipelineJobPublicAPI extends TypedSPI {
* Commit pipeline job.
*
* @param jobId job id
+ * @throws SQLException when commit underlying database data
*/
- void commit(String jobId);
-
- /**
- * Get pipeline job info.
- *
- * @return jobInfos
- */
- List<? extends PipelineJobInfo> list();
+ void commit(String jobId) throws SQLException;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index 2ebc30b0921..e60b48bfe92 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -35,7 +35,7 @@ import java.util.Properties;
* Migration job public API.
*/
@SingletonSPI
-public interface MigrationJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI {
+public interface MigrationJobPublicAPI extends InventoryIncrementalJobPublicAPI, RequiredSPI {
/**
* List all jobs.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index 2898ade8cc1..1b635825c7e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -17,45 +17,22 @@
package org.apache.shardingsphere.data.pipeline.api;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-import java.sql.SQLException;
import java.util.List;
/**
* Pipeline job public API.
*/
-public interface PipelineJobPublicAPI extends TypedSPI {
+public interface PipelineJobPublicAPI {
/**
- * Create process configuration.
+ * Get job type.
*
- * @param processConfig process configuration
+ * @return job type
*/
- void createProcessConfiguration(PipelineProcessConfiguration processConfig);
-
- /**
- * Alter process configuration.
- *
- * @param processConfig process configuration
- */
- void alterProcessConfiguration(PipelineProcessConfiguration processConfig);
-
- /**
- * Drop process configuration.
- *
- * @param confPath configuration path. e.g. <code>/</code>, <code>/READ</code>, <code>/READ/RATE_LIMITER</code>
- */
- void dropProcessConfiguration(String confPath);
-
- /**
- * Show process configuration.
- *
- * @return process configuration, non-null
- */
- PipelineProcessConfiguration showProcessConfiguration();
+ JobType getJobType();
/**
* Start disabled job.
@@ -71,21 +48,6 @@ public interface PipelineJobPublicAPI extends TypedSPI {
*/
void stop(String jobId);
- /**
- * Rollback pipeline job.
- *
- * @param jobId job id
- * @throws SQLException when rollback underlying database data
- */
- void rollback(String jobId) throws SQLException;
-
- /**
- * Commit pipeline job.
- *
- * @param jobId job id
- */
- void commit(String jobId);
-
/**
* Get pipeline job info.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 4443bbc6db1..a6ab527ac32 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -28,18 +28,18 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
public final class PipelineJobPublicAPIFactory {
static {
- ShardingSphereServiceLoader.register(PipelineJobPublicAPI.class);
+ ShardingSphereServiceLoader.register(InventoryIncrementalJobPublicAPI.class);
ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
}
/**
- * Get instance of pipeline job public API.
+ * Get instance of inventory incremental job public API.
*
* @param jobTypeName job type name
* @return got instance
*/
- public static PipelineJobPublicAPI getPipelineJobPublicAPI(@NonNull final String jobTypeName) {
- return TypedSPIRegistry.getRegisteredService(PipelineJobPublicAPI.class, jobTypeName);
+ public static InventoryIncrementalJobPublicAPI getInventoryIncrementalJobPublicAPI(@NonNull final String jobTypeName) {
+ return TypedSPIRegistry.getRegisteredService(InventoryIncrementalJobPublicAPI.class, jobTypeName);
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3d5ce269185..235ff348eac 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -23,22 +23,15 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
@@ -49,7 +42,6 @@ import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
@@ -66,51 +58,8 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
-
- private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
-
private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
- protected abstract JobType getJobType();
-
- @Override
- public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
- PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
- processConfigPersistService.persist(getJobType(), processConfig);
- }
-
- @Override
- public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
- // TODO check rateLimiter type match or not
- YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
- targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
- processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
- PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
- return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
- }
-
- @Override
- public void dropProcessConfiguration(final String confPath) {
- String finalConfPath = confPath.trim();
- PipelineProcessConfigurationUtil.verifyConfPath(confPath);
- YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
- PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
- processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- @Override
- public PipelineProcessConfiguration showProcessConfiguration() {
- PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
- result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
- return result;
- }
-
@Override
public final String marshalJobId(final PipelineJobId pipelineJobId) {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + marshalJobIdLeftPart(pipelineJobId);
@@ -200,29 +149,11 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
- @Override
- public void rollback(final String jobId) throws SQLException {
- log.info("Rollback job {}", jobId);
- stop(jobId);
- cleanTempTableOnRollback(jobId);
- dropJob(jobId);
- }
-
- protected abstract void cleanTempTableOnRollback(String jobId) throws SQLException;
-
- private void dropJob(final String jobId) {
+ protected void dropJob(final String jobId) {
PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
}
- @Override
- public void commit(final String jobId) {
- checkModeConfig();
- log.info("Commit job {}", jobId);
- stop(jobId);
- dropJob(jobId);
- }
-
protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
Preconditions.checkNotNull(result, new PipelineJobNotFoundException(jobId));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
new file mode 100644
index 00000000000..44f48883545
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+/**
+ * Inventory incremental job API implementation.
+ */
+public abstract class InventoryIncrementalJobPublicAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobPublicAPI {
+
+ private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+
+ private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
+
+ @Override
+ public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+ PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
+ processConfigPersistService.persist(getJobType(), processConfig);
+ }
+
+ @Override
+ public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+ // TODO check rateLimiter type match or not
+ YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+ targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+ processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
+ PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
+ return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+ }
+
+ @Override
+ public void dropProcessConfiguration(final String confPath) {
+ String finalConfPath = confPath.trim();
+ PipelineProcessConfigurationUtil.verifyConfPath(confPath);
+ YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+ PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
+ processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ @Override
+ public PipelineProcessConfiguration showProcessConfiguration() {
+ PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
+ result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
+ return result;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 560379526fa..27ef7c8f539 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -59,8 +59,8 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobPublicAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
@@ -116,7 +116,7 @@ import java.util.stream.IntStream;
* Migration job API impl.
*/
@Slf4j
-public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implements MigrationJobAPI {
+public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPIImpl implements MigrationJobAPI {
private static final YamlRuleConfigurationSwapperEngine RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
@@ -127,7 +127,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
@Override
- protected JobType getJobType() {
+ public JobType getJobType() {
return JobType.MIGRATION;
}
@@ -356,7 +356,14 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
}
@Override
- protected void cleanTempTableOnRollback(final String jobId) throws SQLException {
+ public void rollback(final String jobId) throws SQLException {
+ log.info("Rollback job {}", jobId);
+ stop(jobId);
+ cleanTempTableOnRollback(jobId);
+ dropJob(jobId);
+ }
+
+ private void cleanTempTableOnRollback(final String jobId) throws SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
String targetTableName = jobConfig.getTargetTableName();
// TODO use jobConfig.targetSchemaName
@@ -373,6 +380,14 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
}
}
+ @Override
+ public void commit(final String jobId) throws SQLException {
+ checkModeConfig();
+ log.info("Commit job {}", jobId);
+ stop(jobId);
+ dropJob(jobId);
+ }
+
@Override
public void addMigrationSourceResources(final Map<String, DataSourceProperties> dataSourcePropsMap) {
log.info("Add migration source resources {}", dataSourcePropsMap.keySet());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
index 8c8e405a871..cdbe647657c 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.AlterInventoryIncrementalProcessConfigurationStatement;
@@ -31,7 +31,7 @@ public final class AlterInventoryIncrementalProcessConfigurationUpdater implemen
@Override
public void executeUpdate(final String databaseName, final AlterInventoryIncrementalProcessConfigurationStatement sqlStatement) {
- PipelineJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+ InventoryIncrementalJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
PipelineProcessConfiguration processConfig = InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
jobAPI.alterProcessConfiguration(processConfig);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
index 97454efb0ba..2fab9c8c6c3 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.CreateInventoryIncrementalProcessConfigurationStatement;
@@ -31,7 +31,7 @@ public final class CreateInventoryIncrementalProcessConfigurationUpdater impleme
@Override
public void executeUpdate(final String databaseName, final CreateInventoryIncrementalProcessConfigurationStatement sqlStatement) {
- PipelineJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+ InventoryIncrementalJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
PipelineProcessConfiguration processConfig = InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
jobAPI.createProcessConfiguration(processConfig);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
index 6d67482be2a..d550b456268 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.DropPipelineProcessConfigurationStatement;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
@@ -29,7 +29,7 @@ public final class DropPipelineProcessConfigurationUpdater implements RALUpdater
@Override
public void executeUpdate(final String databaseName, final DropPipelineProcessConfigurationStatement sqlStatement) {
- PipelineJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+ InventoryIncrementalJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
jobAPI.dropProcessConfiguration(sqlStatement.getConfPath());
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index fb2eca869b8..912a9ebf2c2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -31,11 +31,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
public final class PipelineJobPublicAPIFactoryTest {
@Test
- public void assertGetPipelineJobPublicAPI() {
- Collection<Pair<JobType, Class<? extends PipelineJobPublicAPI>>> paramResult = new LinkedList<>();
+ public void assertGetInventoryIncrementalJobPublicAPI() {
+ Collection<Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>>> paramResult = new LinkedList<>();
paramResult.add(Pair.of(JobType.MIGRATION, MigrationJobAPIImpl.class));
- for (Pair<JobType, Class<? extends PipelineJobPublicAPI>> each : paramResult) {
- assertThat(PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
+ for (Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>> each : paramResult) {
+ assertThat(PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 41d85c69570..9770d830787 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -135,7 +135,7 @@ public final class MigrationJobAPIImplTest {
}
@Test
- public void assertCommit() {
+ public void assertCommit() throws SQLException {
Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());