You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/04 14:50:28 UTC
[shardingsphere] branch master updated: Extract PipelineTableMetaDataLoader interface; Move dumper creator SPI to api module (#20739)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 769ff9079b7 Extract PipelineTableMetaDataLoader interface; Move dumper creator SPI to api module (#20739)
769ff9079b7 is described below
commit 769ff9079b79e67030f68d52222de6305cfaab7c
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun Sep 4 22:50:20 2022 +0800
Extract PipelineTableMetaDataLoader interface; Move dumper creator SPI to api module (#20739)
* Extract PipelineTableMetaDataLoader as interface and move to api module
* Move dumper creator SPI to api module
* Move out dumper interface from spi package
* Improve code style
---
.../DataConsistencyCalculateParameter.java | 2 +-
.../{spi => api}/ingest/dumper/Dumper.java | 2 +-
.../ingest/dumper/IncrementalDumper.java | 2 +-
.../ingest/dumper/InventoryDumper.java | 2 +-
.../loader/PipelineTableMetaDataLoader.java} | 27 +++++------
.../{ => model}/PipelineColumnMetaData.java | 2 +-
.../api}/metadata/model/PipelineIndexMetaData.java | 3 +-
.../api}/metadata/model/PipelineTableMetaData.java | 3 +-
.../ingest/dumper/IncrementalDumperCreator.java | 12 ++---
.../dumper/IncrementalDumperCreatorFactory.java | 4 +-
.../spi}/ingest/dumper/InventoryDumperCreator.java | 23 +++++-----
.../dumper/InventoryDumperCreatorFactory.java | 14 +++---
.../ingest/dumper/AbstractIncrementalDumper.java | 4 +-
.../ingest/dumper/AbstractInventoryDumper.java | 6 +--
.../core/ingest/dumper/DefaultInventoryDumper.java | 2 +-
.../dumper/DefaultInventoryDumperCreator.java | 10 ++++-
...va => StandardPipelineTableMetaDataLoader.java} | 22 ++++-----
.../core/prepare/InventoryTaskSplitter.java | 8 ++--
.../data/pipeline/core/task/IncrementalTask.java | 6 +--
.../data/pipeline/core/task/InventoryTask.java | 6 +--
.../migration/MigrationDataConsistencyChecker.java | 8 ++--
.../migration/MigrationJobItemContext.java | 5 ++-
.../scenario/migration/MigrationJobPreparer.java | 2 +-
...MatchDataConsistencyCalculateAlgorithmTest.java | 2 +-
.../mysql/MySQLIncrementalDumperCreator.java | 6 +--
.../mysql/MySqlInventoryDumperCreator.java | 9 ++--
.../mysql/ingest/MySQLIncrementalDumper.java | 6 +--
.../mysql/ingest/MySQLInventoryDumper.java | 2 +-
...ine.spi.ingest.dumper.IncrementalDumperCreator} | 0
...eline.spi.ingest.dumper.InventoryDumperCreator} | 0
.../mysql/ingest/MySQLIncrementalDumperTest.java | 9 ++--
.../mysql/ingest/MySQLInventoryDumperTest.java | 4 +-
.../OpenGaussIncrementalDumperCreator.java | 6 +--
.../opengauss/ingest/OpenGaussWalDumper.java | 2 +-
...ine.spi.ingest.dumper.IncrementalDumperCreator} | 0
.../PostgreSQLIncrementalDumperCreator.java | 6 +--
.../PostgreSQLInventoryDumperCreator.java | 6 +--
.../ingest/PostgreSQLInventoryDumper.java | 2 +-
.../postgresql/ingest/PostgreSQLWalDumper.java | 2 +-
.../postgresql/ingest/wal/WalEventConverter.java | 4 +-
...ine.spi.ingest.dumper.IncrementalDumperCreator} | 0
...eline.spi.ingest.dumper.InventoryDumperCreator} | 0
.../ingest/PostgreSQLJdbcDumperTest.java | 4 +-
.../postgresql/ingest/PostgreSQLWalDumperTest.java | 5 ++-
.../ingest/wal/WalEventConverterTest.java | 4 +-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 7 +--
.../IncrementalDumperCreatorFactoryTest.java | 52 +++++++++-------------
.../dumper/InventoryDumperCreatorFactoryTest.java | 43 +++++++-----------
.../core/fixture/FixtureIncrementalDumper.java | 2 +-
.../fixture/FixtureIncrementalDumperCreator.java | 13 +++---
.../core/fixture/FixtureInventoryDumper.java | 2 +-
.../fixture/FixtureInventoryDumperCreator.java | 13 +++---
...> StandardPipelineTableMetaDataLoaderTest.java} | 13 +++---
.../metadata/model/PipelineTableMetaDataTest.java | 3 +-
.../pipeline/core/task/IncrementalTaskTest.java | 5 ++-
.../data/pipeline/core/task/InventoryTaskTest.java | 7 +--
...ine.spi.ingest.dumper.IncrementalDumperCreator} | 0
...eline.spi.ingest.dumper.InventoryDumperCreator} | 0
58 files changed, 200 insertions(+), 214 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 6d6cb7ecb18..7463fadae10 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -24,7 +24,7 @@ import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import java.util.Collection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
index bd4e101fb61..79724699ab6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
index 6871815663a..db292c1b1bf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
/**
* Incremental dumper.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
index cd79d9b89d9..9405e29878b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
/**
* Inventory dumper.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
similarity index 60%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
index 759ce69a0fe..5fff7bb307c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
@@ -15,24 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.loader;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
/**
- * Pipeline meta data of index.
+ * Pipeline table metadata loader.
*/
-@RequiredArgsConstructor
-@Getter
-@ToString
-public final class PipelineIndexMetaData {
-
- private final String name;
+public interface PipelineTableMetaDataLoader {
- private final List<PipelineColumnMetaData> columns;
+ /**
+ * Get table metadata, load if it does not exist.
+ *
+ * @param schemaName schema name. nullable
+ * @param tableName dedicated table name, not table name pattern
+ * @return table metadata
+ */
+ PipelineTableMetaData getTableMetaData(String schemaName, String tableName);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
index d4476eeb53f..ccc001cf6c1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.metadata;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
import lombok.Getter;
import lombok.NonNull;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
similarity index 88%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
index 759ce69a0fe..2a1713efeac 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
import java.util.List;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
index 7a1ed2d9789..c35eb8f463f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
import java.util.ArrayList;
import java.util.Collection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
similarity index 82%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
index 5583e056e1e..6eb1b956145 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -34,11 +34,11 @@ public interface IncrementalDumperCreator<P> extends TypedSPI {
/**
* Create incremental dumper.
*
- * @param dumperConfig dumperConfig
+ * @param dumperConfig dumper configuration
* @param position position
* @param channel channel
- * @param metaDataLoader metaDataLoader
- * @return IncrementalDumper
+ * @param metaDataLoader meta data loader
+ * @return incremental dumper
*/
IncrementalDumper createIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition<P> position,
PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
index 4db34d487ef..f81e9fbcd61 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -36,7 +36,7 @@ public class IncrementalDumperCreatorFactory {
* Incremental dumper creator.
*
* @param databaseType database type
- * @return Incremental dumper creator
+ * @return incremental dumper creator
*/
public static IncrementalDumperCreator getInstance(final String databaseType) {
return TypedSPIRegistry.getRegisteredService(IncrementalDumperCreator.class, databaseType);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
similarity index 70%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
index 0bc3df83de2..496e9bb7a66 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
@@ -15,29 +15,32 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
-import javax.sql.DataSource;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import javax.sql.DataSource;
+
/**
* Inventory dumper creator.
*/
@SingletonSPI
-public interface InventoryDumperCreator extends TypedSPI {
+public interface InventoryDumperCreator extends TypedSPI, RequiredSPI {
/**
- * Create Inventory Dumper.
- * @param inventoryDumperConfig inventoryDumperConfig
+ * Create inventory dumper.
+ *
+ * @param inventoryDumperConfig inventory dumper configuration
* @param channel chanel
- * @param sourceDataSource sourceDataSource
- * @param sourceMetaDataLoader sourceMetaDataLoader
- * @return InventoryDumper
+ * @param sourceDataSource source data source
+ * @param sourceMetaDataLoader source meta data loader
+ * @return inventory dumper
*/
InventoryDumper createInventoryDumper(InventoryDumperConfiguration inventoryDumperConfig, PipelineChannel channel,
DataSource sourceDataSource, PipelineTableMetaDataLoader sourceMetaDataLoader);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
similarity index 74%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
index 6c218b56391..6c7b2120630 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
-import java.util.Optional;
-
/**
* Inventory dumper creator factory.
*/
@@ -35,13 +34,12 @@ public class InventoryDumperCreatorFactory {
}
/**
- * Get inventoryDumper creator instance.
+ * Get inventory dumper creator instance.
*
- * @param databaseType databaseType
- * @return InventoryDumperCreator
+ * @param databaseType database type
+ * @return inventory dumper creator
*/
public static InventoryDumperCreator getInstance(final String databaseType) {
- Optional<InventoryDumperCreator> result = TypedSPIRegistry.findRegisteredService(InventoryDumperCreator.class, databaseType);
- return result.orElseGet(DefaultInventoryDumperCreator::new);
+ return TypedSPIRegistry.findRegisteredService(InventoryDumperCreator.class, databaseType).orElseGet(() -> RequiredSPIRegistry.getRegisteredService(InventoryDumperCreator.class));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
index f36e16e6f2d..bd396dac9cb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
/**
* Abstract incremental dumper.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 34f0ebee193..78de64e85dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -38,14 +39,13 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
index 22e9c343dc8..596f740a609 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import javax.sql.DataSource;
import java.sql.Connection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
index 335b989d454..2860dcb14f0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
@@ -19,8 +19,9 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
import javax.sql.DataSource;
@@ -34,4 +35,9 @@ public final class DefaultInventoryDumperCreator implements InventoryDumperCreat
final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
return new DefaultInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
}
+
+ @Override
+ public boolean isDefault() {
+ return true;
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
similarity index 91%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 2b5f943091f..dcc6ed43e6a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -21,9 +21,10 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import java.sql.Connection;
@@ -41,25 +42,18 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
/**
- * Pipeline table metadata loader.
+ * Standard pipeline table metadata loader.
*/
@RequiredArgsConstructor
@Slf4j
-// TODO PipelineTableMetaDataLoader SPI
-public final class PipelineTableMetaDataLoader {
+public final class StandardPipelineTableMetaDataLoader implements PipelineTableMetaDataLoader {
- // TODO it doesn't support ShardingSphereDataSource
+ // It doesn't support ShardingSphereDataSource
private final PipelineDataSourceWrapper dataSource;
private final Map<TableName, PipelineTableMetaData> tableMetaDataMap = new ConcurrentHashMap<>();
- /**
- * Get table metadata, load if it does not exist.
- *
- * @param schemaName schema name. nullable
- * @param tableName dedicated table name, not table name pattern
- * @return table metadata
- */
+ @Override
public PipelineTableMetaData getTableMetaData(final String schemaName, final String tableName) {
PipelineTableMetaData result = tableMetaDataMap.get(new TableName(tableName));
if (null != result) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index c5d10c6f5b4..f2e3a02064b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -35,14 +35,14 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimary
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index df9150dff41..d6aad311a44 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -26,19 +26,19 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
import java.util.Collection;
import java.util.LinkedList;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 23dbf5ffc77..ba1f187c65d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -26,19 +26,19 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreatorFactory;
import javax.sql.DataSource;
import java.util.List;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 201422f5ed0..3c14a669de0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -28,11 +28,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -174,7 +174,7 @@ public final class MigrationDataConsistencyChecker {
String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
for (String each : Collections.singletonList(sourceTableName)) {
- PipelineTableMetaData tableMetaData = new PipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
+ PipelineTableMetaData tableMetaData = new StandardPipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
if (null == tableMetaData) {
throw new PipelineDataConsistencyCheckFailedException("Can not get metadata for table " + each);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c5a4470921b..9a72f14c16c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -29,8 +29,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -79,7 +80,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
@Override
protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
- return new PipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
+ return new StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
}
};
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index c3cd181a04d..c02cd51ea0f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -28,12 +28,12 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index 04c6b47d9a9..d4240d77e4a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import org.junit.Before;
import org.junit.Test;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
index 6fe23ba7a34..076c700b23b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.mysql;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
/**
* MySql incremental dumper creator.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
index 7697ee5d13c..225fbeb9e9e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
@@ -17,13 +17,14 @@
package org.apache.shardingsphere.data.pipeline.mysql;
-import javax.sql.DataSource;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
+
+import javax.sql.DataSource;
public class MySqlInventoryDumperCreator implements InventoryDumperCreator {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 40727cb4ac3..b6f0e056f20 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -31,11 +31,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 2fde33ba951..6fd46b68b3a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import javax.sql.DataSource;
import java.sql.Connection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumpe [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 3ee5618cd76..002201f3ec3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -27,12 +27,13 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderReco
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -82,7 +83,7 @@ public final class MySQLIncrementalDumperTest {
initTableData(dumperConfig);
dumperConfig.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
channel = new MultiplexMemoryPipelineChannel();
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true);
when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(column);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
index 0d8bc4f31fc..d8aaf019c89 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +49,7 @@ public final class MySQLInventoryDumperTest {
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new StandardPipelineTableMetaDataLoader(dataSource));
initTableData(dataSource);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index fce078e1591..4083a4cd690 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.opengauss;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
/**
* OpenGauss incremental dumper creator.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 8362bed849d..84123622fb8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Incremen [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
index 684f191b380..4b067556abc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.postgresql;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
/**
* PostgreSQL incremental dumper creator.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
index aedabf2fd75..6394e2e9cb2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
@@ -19,10 +19,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
import javax.sql.DataSource;
import java.util.Collection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
index 7f56c647281..3cbf9a158b5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import javax.sql.DataSource;
import java.sql.Connection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 67f181511ab..c304037e96d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index cc9e26117d4..4ffeabcc458 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Increm [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Inventor [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index d4388d1473f..2ed82a1616e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +50,7 @@ public final class PostgreSQLJdbcDumperTest {
InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100),
- dataSource, new PipelineTableMetaDataLoader(dataSource));
+ dataSource, new StandardPipelineTableMetaDataLoader(dataSource));
initTableData(dataSource);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 7e686c000ec..2ecb055f62d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -24,10 +24,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
@@ -84,7 +85,7 @@ public final class PostgreSQLWalDumperTest {
position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
channel = new MultiplexMemoryPipelineChannel();
dumperConfig = mockDumperConfiguration();
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index 16681b8b044..dd93c267854 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
@@ -59,7 +59,7 @@ public final class WalEventConverterTest {
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
- walEventConverter = new WalEventConverter(dumperConfig, new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
+ walEventConverter = new WalEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
initTableData(dumperConfig);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f340101601c..f64fde18e31 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,12 +22,13 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -141,7 +142,7 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setUniqueKeyDataType(Types.INTEGER);
dumperConfig.setShardingItem(0);
PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
new DefaultPipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
}
@@ -149,7 +150,7 @@ public final class GovernanceRepositoryAPIImplTest {
private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
dumperConfig.setPosition(new PlaceholderPosition());
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
new DefaultPipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
index 6352be60159..b9b1477b6b8 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
@@ -17,80 +17,68 @@
package org.apache.shardingsphere.data.pipeline.core.dumper;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import java.util.Collections;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.junit.Before;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
import org.junit.Test;
import org.mockito.Mock;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
public final class IncrementalDumperCreatorFactoryTest {
- private PipelineDataSourceWrapper dataSource;
-
@Mock
- private WalPosition walPosition;
-
- @Before
- public void setUp() {
- PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
- DumperConfiguration dumperConfig = mockDumperConfiguration();
- dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- }
+ private IngestPosition<?> ingestPosition;
@Test
public void assertIncrementalDumperCreatorForMysql() {
- IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("MySQL")
- .createIncrementalDumper(mockDumperConfiguration(), new BinlogPosition("binlog-000001", 4L), new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+ IncrementalDumper actual = createIncrementalDumper("MySQL");
assertThat(actual, instanceOf(MySQLIncrementalDumper.class));
}
@Test
public void assertIncrementalDumperCreatorForPostgreSQL() {
- IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("PostgreSQL")
- .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+ IncrementalDumper actual = createIncrementalDumper("PostgreSQL");
assertThat(actual, instanceOf(PostgreSQLWalDumper.class));
}
@Test
public void assertIncrementalDumperCreatorForOpenGauss() {
- IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("openGauss")
- .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+ IncrementalDumper actual = createIncrementalDumper("openGauss");
assertThat(actual, instanceOf(OpenGaussWalDumper.class));
}
@Test
public void assertIncrementalDumperCreatorForFixture() {
- IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("Fixture")
- .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+ IncrementalDumper actual = createIncrementalDumper("Fixture");
assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
}
@Test
public void assertIncrementalDumperCreatorForH2() {
- IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("H2")
- .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+ IncrementalDumper actual = createIncrementalDumper("H2");
assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
}
+ private IncrementalDumper createIncrementalDumper(final String databaseType) {
+ return IncrementalDumperCreatorFactory.getInstance(databaseType)
+ .createIncrementalDumper(mockDumperConfiguration(), ingestPosition, new SimpleMemoryPipelineChannel(100), new StandardPipelineTableMetaDataLoader(null));
+ }
+
private DumperConfiguration mockDumperConfiguration() {
DumperConfiguration result = new DumperConfiguration();
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
index 3a9c90b14d8..d8bcbf36379 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
@@ -17,63 +17,54 @@
package org.apache.shardingsphere.data.pipeline.core.dumper;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
-import org.junit.Before;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreatorFactory;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
public final class InventoryDumperCreatorFactoryTest {
- private PipelineDataSourceWrapper dataSource;
-
- @Before
- public void setUp() {
- PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
- InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
- dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- }
-
@Test
- public void assertInventoryDumperCreatorForMysql() {
- InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("MySql")
- .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ public void assertInventoryDumperCreatorForMySQL() {
+ InventoryDumper actual = createInventoryDumper("MySQL");
assertThat(actual, instanceOf(MySQLInventoryDumper.class));
}
@Test
public void assertInventoryDumperCreatorForPostgreSQL() {
- InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("PostgreSQL")
- .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ InventoryDumper actual = createInventoryDumper("PostgreSQL");
assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
}
@Test
public void assertInventoryDumperCreatorForOpenGauss() {
- InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("openGauss")
- .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ InventoryDumper actual = createInventoryDumper("openGauss");
assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
}
@Test
public void assertInventoryDumperCreatorForFixture() {
- InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("Fixture")
- .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ InventoryDumper actual = createInventoryDumper("Fixture");
assertThat(actual, instanceOf(FixtureInventoryDumper.class));
}
+ private InventoryDumper createInventoryDumper(final String databaseType) {
+ PipelineDataSourceWrapper dataSource = null;
+ return InventoryDumperCreatorFactory.getInstance(databaseType)
+ .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new StandardPipelineTableMetaDataLoader(dataSource));
+ }
+
private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index b97eebb219a..c613f92e5b7 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -21,8 +21,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index 7ca4d4d49eb..b0152f36f90 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -17,16 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
/**
* Fixture incremental dumper creator.
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
index 3da47584451..dfa4d701596 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import javax.sql.DataSource;
import java.sql.Connection;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
index d099e0ab3a9..d41cde2b172 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
@@ -17,15 +17,16 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
+
+import javax.sql.DataSource;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import javax.sql.DataSource;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
/**
* Fixture iInventory dumper creator.
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
similarity index 91%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
index 3c59c49838e..a6223c54031 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
@@ -18,9 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import org.apache.shardingsphere.test.mock.MockedDataSource;
import org.junit.Before;
@@ -43,7 +44,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
// TODO use H2 to do real test
-public final class PipelineTableMetaDataLoaderTest {
+public final class StandardPipelineTableMetaDataLoaderTest {
private static final String TEST_CATALOG = "catalog";
@@ -115,7 +116,7 @@ public final class PipelineTableMetaDataLoaderTest {
@Test
public void assertGetTableMetaData() {
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(null, TEST_TABLE);
assertColumnMetaData(tableMetaData);
assertPrimaryKeys(tableMetaData.getPrimaryKeyColumns());
@@ -151,6 +152,6 @@ public final class PipelineTableMetaDataLoaderTest {
@Test(expected = RuntimeException.class)
public void assertGetTableMetaDataFailure() throws SQLException {
when(dataSource.getConnection()).thenThrow(new SQLException(""));
- new PipelineTableMetaDataLoader(dataSource).getTableMetaData(null, TEST_TABLE);
+ new StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(null, TEST_TABLE);
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index 53638b69e49..59b1449474f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -17,7 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.model;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.junit.Before;
import org.junit.Test;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index e5818fb9667..c0097e72f04 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -20,9 +20,10 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.junit.After;
@@ -48,7 +49,7 @@ public final class IncrementalTaskTest {
public void setUp() {
TaskConfiguration taskConfig = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
new DefaultPipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index d1a6d3b28e2..03096ef1cad 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -23,10 +23,11 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.junit.AfterClass;
@@ -67,7 +68,7 @@ public final class InventoryTaskTest {
public void assertStartWithGetEstimatedRowsFailure() {
InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
@@ -82,7 +83,7 @@ public final class InventoryTaskTest {
// TODO use t_order_0, and also others
InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_order", "t_order");
PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
similarity index 100%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
similarity index 100%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator