You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/09/15 02:28:28 UTC
[shardingsphere] branch master updated: Adjust pipeline SPI class name and package name; Revise javadoc (#20983)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 943f5ee5064 Adjust pipeline SPI class name and package name; Revise javadoc (#20983)
943f5ee5064 is described below
commit 943f5ee50642170f33aa718da7258462d170ff07
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Thu Sep 15 10:28:17 2022 +0800
Adjust pipeline SPI class name and package name; Revise javadoc (#20983)
* Rename and move metadata changed event handler
* Move rate limiter impl
* Clean unused PipelineSchemaTableUtil.getSchemaTablesMapFromActual method
* Improve PipelineJobPublicAPIFactoryTest
* Use drop table SQL for rollback
* Improve log and javadoc
---
.../pipeline/api/PipelineJobPublicAPIFactory.java | 2 +-
.../spi/sqlbuilder/PipelineSQLBuilder.java | 6 ++--
.../metadata/node/PipelineMetaDataNodeWatcher.java | 18 +++++-----
.../PipelineChangedJobConfigurationProcessor.java} | 14 ++++----
...neChangedJobConfigurationProcessorFactory.java} | 16 ++++-----
.../PipelineMetaDataChangedEventHandler.java} | 8 ++---
...ipelineMetaDataChangedEventHandlerFactory.java} | 16 ++++-----
.../impl/BarrierMetaDataChangedEventHandler.java} | 7 ++--
.../impl/ChangedJobConfigurationDispatcher.java} | 19 +++++-----
.../ratelimit/QPSJobRateLimitAlgorithm.java | 2 +-
.../ratelimit/TPSJobRateLimitAlgorithm.java | 2 +-
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 4 +--
.../core/util/PipelineSchemaTableUtil.java | 42 ++--------------------
.../core/util/PipelineTableMetaDataUtil.java | 9 +++--
...MigrationChangedJobConfigurationProcessor.java} | 22 ++++++------
.../scenario/migration/MigrationJobAPIImpl.java | 2 +-
...ndler.PipelineChangedJobConfigurationProcessor} | 2 +-
...nt.handler.PipelineMetaDataChangedEventHandler} | 4 +--
...ta.pipeline.spi.ratelimit.JobRateLimitAlgorithm | 4 +--
.../fixture/FixturePipelineSQLBuilder.java | 2 +-
...ineMetaDataChangedEventHandlerFactoryTest.java} | 14 ++++----
.../src/main/resources/logback.xml | 2 ++
.../api/PipelineJobPublicAPIFactoryTest.java | 14 ++++++++
23 files changed, 109 insertions(+), 122 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 958d6943872..4443bbc6db1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -28,8 +28,8 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
public final class PipelineJobPublicAPIFactory {
static {
- ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
ShardingSphereServiceLoader.register(PipelineJobPublicAPI.class);
+ ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 1c88196bab9..ad2f8365d7e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -98,13 +98,13 @@ public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
String buildDeleteSQL(String schemaName, DataRecord dataRecord, Collection<Column> conditionColumns);
/**
- * Build truncate SQL.
+ * Build drop SQL.
*
* @param schemaName schema name
* @param tableName table name
- * @return truncate SQL
+ * @return drop SQL
*/
- String buildTruncateSQL(String schemaName, String tableName);
+ String buildDropSQL(String schemaName, String tableName);
/**
* Build count SQL.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
index 18dec825008..70326d46e58 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.node;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler;
-import org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandlerFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandlerFactory;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import java.util.Collection;
@@ -31,25 +31,25 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
/**
- * Pipeline metaData nodeWatcher.
+ * Pipeline meta data node watcher.
*/
@Slf4j
public final class PipelineMetaDataNodeWatcher {
private static final PipelineMetaDataNodeWatcher INSTANCE = new PipelineMetaDataNodeWatcher();
- private final Map<Pattern, PipelineMetaDataChangedHandler> listenerMap = new ConcurrentHashMap<>();
+ private final Map<Pattern, PipelineMetaDataChangedEventHandler> listenerMap = new ConcurrentHashMap<>();
private PipelineMetaDataNodeWatcher() {
- Collection<PipelineMetaDataChangedHandler> instances = PipelineMetaDataChangedHandlerFactory.findAllInstances();
- for (PipelineMetaDataChangedHandler each : instances) {
+ Collection<PipelineMetaDataChangedEventHandler> instances = PipelineMetaDataChangedEventHandlerFactory.findAllInstances();
+ for (PipelineMetaDataChangedEventHandler each : instances) {
listenerMap.put(each.getKeyPattern(), each);
}
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT, this::dispatchEvent);
}
private void dispatchEvent(final DataChangedEvent event) {
- for (Entry<Pattern, PipelineMetaDataChangedHandler> entry : listenerMap.entrySet()) {
+ for (Entry<Pattern, PipelineMetaDataChangedEventHandler> entry : listenerMap.entrySet()) {
if (entry.getKey().matcher(event.getKey()).matches()) {
entry.getValue().handle(event);
return;
@@ -58,9 +58,9 @@ public final class PipelineMetaDataNodeWatcher {
}
/**
- * Get pipeline metaData nodeWatcher instance.
+ * Get instance.
*
- * @return pipeline metaData nodeWatcher
+ * @return instance
*/
public static PipelineMetaDataNodeWatcher getInstance() {
return INSTANCE;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
similarity index 71%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
index 02ef7febb6f..ea8edfdbb57 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
@@ -15,22 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.process;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
/**
- * Pipeline job config changed event processor.
+ * Pipeline changed job configuration processor.
*/
-public interface PipelineJobConfigurationChangedEventProcessor extends TypedSPI {
+public interface PipelineChangedJobConfigurationProcessor extends TypedSPI {
/**
- * Process data changed event.
+ * Process changed job configuration.
*
- * @param jobConfigPOJO job config
- * @param event event
+ * @param eventType event type
+ * @param jobConfigPOJO job configuration pojo
*/
- void process(DataChangedEvent event, JobConfigurationPOJO jobConfigPOJO);
+ void process(DataChangedEvent.Type eventType, JobConfigurationPOJO jobConfigPOJO);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
similarity index 66%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
index 2ae73d39051..71a5a4febc6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.process;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -24,22 +24,22 @@ import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
/**
- * Pipeline job config changed event processor factory.
+ * Pipeline changed job configuration processor factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineJobConfigurationChangedEventProcessorFactory {
+public final class PipelineChangedJobConfigurationProcessorFactory {
static {
- ShardingSphereServiceLoader.register(PipelineJobConfigurationChangedEventProcessor.class);
+ ShardingSphereServiceLoader.register(PipelineChangedJobConfigurationProcessor.class);
}
/**
- * Get job changed event processor instance.
+ * Get instance.
*
* @param jobType job type
- * @return pipeline event process
+ * @return instance
*/
- public static PipelineJobConfigurationChangedEventProcessor getInstance(final JobType jobType) {
- return TypedSPIRegistry.getRegisteredService(PipelineJobConfigurationChangedEventProcessor.class, jobType.getTypeName());
+ public static PipelineChangedJobConfigurationProcessor getInstance(final JobType jobType) {
+ return TypedSPIRegistry.getRegisteredService(PipelineChangedJobConfigurationProcessor.class, jobType.getTypeName());
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
similarity index 84%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
index 9b963e76a82..4b8bde727c7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -23,10 +23,10 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.regex.Pattern;
/**
- * Pipeline meta data changed handler.
+ * Pipeline meta data changed event handler.
*/
@SingletonSPI
-public interface PipelineMetaDataChangedHandler {
+public interface PipelineMetaDataChangedEventHandler {
/**
* Get key pattern.
@@ -36,7 +36,7 @@ public interface PipelineMetaDataChangedHandler {
Pattern getKeyPattern();
/**
- * Event changed handler.
+ * Handle meta data changed event.
*
* @param event changed event
*/
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactory.java
similarity index 73%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactory.java
index 0d746d5db8c..21d0d94f514 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactory.java
@@ -15,27 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import java.util.Collection;
/**
- * Pipeline meta data listener factory.
+ * Pipeline meta data changed event handler factory.
*/
-public final class PipelineMetaDataChangedHandlerFactory {
+public final class PipelineMetaDataChangedEventHandlerFactory {
static {
- ShardingSphereServiceLoader.register(PipelineMetaDataChangedHandler.class);
+ ShardingSphereServiceLoader.register(PipelineMetaDataChangedEventHandler.class);
}
/**
- * Get pipeline meta data listener instance.
+ * Get instances.
*
- * @return pipeline meta data listener
+ * @return instances
*/
- public static Collection<PipelineMetaDataChangedHandler> findAllInstances() {
- return ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedHandler.class);
+ public static Collection<PipelineMetaDataChangedEventHandler> findAllInstances() {
+ return ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
similarity index 81%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index 676baa8efa4..95d32aa87b4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -26,10 +27,10 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.regex.Pattern;
/**
- * Barrier pipeline meta data handler, .
+ * Barrier meta data changed event handler.
*/
@Slf4j
-public final class BarrierMetaDataChangedHandler implements PipelineMetaDataChangedHandler {
+public final class BarrierMetaDataChangedEventHandler implements PipelineMetaDataChangedEventHandler {
@Override
public Pattern getKeyPattern() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
similarity index 63%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
index f6e849f3902..18da7257ad5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor;
-import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessorFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -29,10 +30,10 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.regex.Pattern;
/**
- * Job config changed handler.
+ * Changed job configuration dispatcher.
*/
@Slf4j
-public final class JobConfigurationChangedHandler implements PipelineMetaDataChangedHandler {
+public final class ChangedJobConfigurationDispatcher implements PipelineMetaDataChangedEventHandler {
@Override
public Pattern getKeyPattern() {
@@ -41,17 +42,17 @@ public final class JobConfigurationChangedHandler implements PipelineMetaDataCha
@Override
public void handle(final DataChangedEvent event) {
- log.info("{} job config: {}", event.getType(), event.getKey());
JobConfigurationPOJO jobConfigPOJO;
try {
jobConfigPOJO = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
- log.error("analyze job config pojo failed.", ex);
+ log.error("unmarshal job configuration pojo failed.", ex);
return;
}
- PipelineJobConfigurationChangedEventProcessor processor = PipelineJobConfigurationChangedEventProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
- processor.process(event, jobConfigPOJO);
+ log.info("{} job configuration: {}, disabled={}", event.getType(), event.getKey(), jobConfigPOJO.isDisabled());
+ PipelineChangedJobConfigurationProcessor processor = PipelineChangedJobConfigurationProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
+ processor.process(event.getType(), jobConfigPOJO);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/QPSJobRateLimitAlgorithm.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/QPSJobRateLimitAlgorithm.java
index 5a095585b94..8aad098b1c4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/QPSJobRateLimitAlgorithm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.ratelimit;
+package org.apache.shardingsphere.data.pipeline.core.ratelimit;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/TPSJobRateLimitAlgorithm.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/TPSJobRateLimitAlgorithm.java
index 8c3c4fc52b4..4a5cb6b426c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/TPSJobRateLimitAlgorithm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.ratelimit;
+package org.apache.shardingsphere.data.pipeline.core.ratelimit;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 3914d0294ab..dca4aefb36a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -162,8 +162,8 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public String buildTruncateSQL(final String schemaName, final String tableName) {
- return String.format("TRUNCATE TABLE %s", decorate(schemaName, tableName));
+ public String buildDropSQL(final String schemaName, final String tableName) {
+ return String.format("DROP TABLE IF EXISTS %s", decorate(schemaName, tableName));
}
private String buildDeleteSQLInternal(final String schemaName, final String tableName, final Collection<Column> conditionColumns) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
index c911d5481fa..08618150c19 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
@@ -17,60 +17,24 @@
package org.apache.shardingsphere.data.pipeline.core.util;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.ObjectUtils;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
/**
* Pipeline schema table util.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class PipelineSchemaTableUtil {
- private PipelineSchemaTableUtil() {
- }
-
- /**
- * Get schema tables map from actual data source.
- *
- * @param pipelineDataSourceConfig pipeline data source config
- * @param schemaName schema name,
- * @param tableName table name
- * @return schema tables map
- */
- public static Map<String, List<String>> getSchemaTablesMapFromActual(final PipelineDataSourceConfiguration pipelineDataSourceConfig, final String schemaName, final String tableName) {
- log.info("start get schema tables from actual, begin:{}", LocalDateTime.now());
- Map<String, List<String>> result = new HashMap<>();
- try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig)) {
- try (Connection connection = dataSource.getConnection()) {
- DatabaseMetaData metaData = connection.getMetaData();
- String targetSchema = ObjectUtils.defaultIfNull(schemaName, connection.getSchema());
- ResultSet resultSet = metaData.getTables(connection.getCatalog(), targetSchema, tableName, new String[]{"TABLE"});
- while (resultSet.next()) {
- result.computeIfAbsent(targetSchema, k -> new ArrayList<>()).add(resultSet.getString("TABLE_NAME"));
- }
- log.info("get schema tables success, catalog:{}, schema:{}, table:{}, result:{}, end:{}", targetSchema, connection.getCatalog(), tableName, resultSet, LocalDateTime.now());
- }
- } catch (final SQLException ex) {
- log.error("get schema name map error", ex);
- throw new RuntimeException(ex.getMessage());
- }
- return result;
- }
-
/**
* Get default schema by connection.getSchema().
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
index eb8b74d6f87..26bc40b2c27 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.util;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -36,6 +38,7 @@ import java.util.List;
/**
* Pipeline table meta data util.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineTableMetaDataUtil {
/**
@@ -44,7 +47,7 @@ public final class PipelineTableMetaDataUtil {
* @param schemaName schema name
* @param tableName table name
* @param dataSourceConfig source configuration
- * @param loader pipeline table meta data loader* @return pipeline table meta data
+ * @param loader pipeline table meta data loader
* @return pipeline table meta data
*/
@SneakyThrows(SQLException.class)
@@ -74,7 +77,7 @@ public final class PipelineTableMetaDataUtil {
}
/**
- * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+ * Get unique key column.
*
* @param schemaName schema name
* @param tableName table name
@@ -89,7 +92,7 @@ public final class PipelineTableMetaDataUtil {
}
/**
- * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+ * Get unique key column.
*
* @param schemaName schema name
* @param tableName table name
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
similarity index 78%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 75e931ba70f..b54a5a493c8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.process;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
@@ -24,8 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -33,25 +32,26 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.concurrent.CompletableFuture;
/**
- * Migration job config event processor.
+ * Migration job configuration changed processor.
*/
@Slf4j
-public final class MigrationJobConfigurationChangedEventProcessor implements PipelineJobConfigurationChangedEventProcessor {
+public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
@Override
- public void process(final DataChangedEvent event, final JobConfigurationPOJO jobConfigPOJO) {
+ public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
+ log.info("{} is disabled", jobId);
PipelineJobCenter.stop(jobId);
return;
}
- switch (event.getType()) {
+ switch (eventType) {
case ADDED:
case UPDATED:
- if (PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
- log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
+ if (PipelineJobCenter.isJobExisting(jobId)) {
+ log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
- log.info("{} executing jobs", jobConfigPOJO.getJobName());
+ log.info("{} executing jobs", jobId);
CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());
}
break;
@@ -59,7 +59,7 @@ public final class MigrationJobConfigurationChangedEventProcessor implements Pip
log.info("deleted jobId={}", jobId);
MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
new MigrationJobPreparer().cleanup(jobConfig);
- PipelineJobCenter.stop(jobConfigPOJO.getJobName());
+ PipelineJobCenter.stop(jobId);
break;
default:
break;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index e68b75c7a1f..bc2dc0c57ae 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -362,7 +362,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
try (
PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
- String sql = pipelineSQLBuilder.buildTruncateSQL(targetSchemaName, targetTableName);
+ String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, targetTableName);
log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, targetTableName, sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
similarity index 88%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
index 55caeea6a01..89ef92e051e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.process.MigrationJobConfigurationChangedEventProcessor
+org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationChangedJobConfigurationProcessor
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler
similarity index 77%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler
index 2d7ae078cf4..56b2c72dd8a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler
@@ -15,5 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.handler.JobConfigurationChangedHandler
-org.apache.shardingsphere.data.pipeline.core.spi.handler.BarrierMetaDataChangedHandler
+org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.ChangedJobConfigurationDispatcher
+org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.BarrierMetaDataChangedEventHandler
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
index ed125966a01..a5df6f2d7a4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
@@ -15,5 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.QPSJobRateLimitAlgorithm
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.TPSJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.ratelimit.QPSJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.ratelimit.TPSJobRateLimitAlgorithm
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index b72cfd6d331..f58081c17f8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -57,7 +57,7 @@ public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public String buildTruncateSQL(final String schemaName, final String tableName) {
+ public String buildDropSQL(final String schemaName, final String tableName) {
return "";
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactoryTest.java
similarity index 63%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactoryTest.java
index d9a449bdb2b..0ace293e4c5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactoryTest.java
@@ -15,27 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.BarrierMetaDataChangedEventHandler;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.ChangedJobConfigurationDispatcher;
import org.junit.Test;
import java.util.Collection;
import static org.junit.Assert.assertTrue;
-public final class PipelineMetaDataChangedHandlerFactoryTest {
+public final class PipelineMetaDataChangedEventHandlerFactoryTest {
@Test
public void assertFindInstance() {
- Collection<PipelineMetaDataChangedHandler> actual = PipelineMetaDataChangedHandlerFactory.findAllInstances();
+ Collection<PipelineMetaDataChangedEventHandler> actual = PipelineMetaDataChangedEventHandlerFactory.findAllInstances();
boolean isContainMigration = false;
boolean isContainBarrier = false;
- for (PipelineMetaDataChangedHandler each : actual) {
- if (each instanceof JobConfigurationChangedHandler) {
+ for (PipelineMetaDataChangedEventHandler each : actual) {
+ if (each instanceof ChangedJobConfigurationDispatcher) {
isContainMigration = true;
continue;
}
- if (each instanceof BarrierMetaDataChangedHandler) {
+ if (each instanceof BarrierMetaDataChangedEventHandler) {
isContainBarrier = true;
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml
index efeb323b7d0..bb213a9e6d7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml
@@ -32,6 +32,8 @@
<logger name="io.netty" level="error" />
+ <logger name="org.apache.shardingsphere.elasticjob" level="debug" />
+
<root>
<level value="info" />
<appender-ref ref="console" />
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index 1060131fd4e..6a3e1edc364 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -17,14 +17,28 @@
package org.apache.shardingsphere.data.pipeline.api;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
import org.junit.Test;
+import java.util.Collection;
+import java.util.LinkedList;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
public final class PipelineJobPublicAPIFactoryTest {
+ @Test
+ public void assertGetPipelineJobPublicAPI() {
+ Collection<Pair<JobType, Class<? extends PipelineJobPublicAPI>>> paramResult = new LinkedList<>();
+ paramResult.add(Pair.of(JobType.MIGRATION, MigrationJobAPIImpl.class));
+ for (Pair<JobType, Class<? extends PipelineJobPublicAPI>> each : paramResult) {
+ assertThat(PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
+ }
+ }
+
@Test
public void assertGetMigrationJobPublicAPI() {
assertThat(PipelineJobPublicAPIFactory.getMigrationJobPublicAPI(), instanceOf(MigrationJobAPIImpl.class));