You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/30 11:35:39 UTC
[shardingsphere] branch master updated: Refactor Importer and PipelineSQLBuilder (#17232)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6a56b13a705 Refactor Importer and PipelineSQLBuilder (#17232)
6a56b13a705 is described below
commit 6a56b13a70554ff965eaea6ac527dfa8fb3bf1b4
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Apr 30 19:35:22 2022 +0800
Refactor Importer and PipelineSQLBuilder (#17232)
* Refactor Importer and PipelineSQLBuilder, use shardingColumnsMap on demand
* Mark PipelineSQLBuilder as stateful since sqlCacheMap
* Unit test
---
.../spi/sqlbuilder/PipelineSQLBuilder.java | 20 +++----
.../pipeline/core/importer/AbstractImporter.java | 18 ++----
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 22 ++------
.../core/sqlbuilder/PipelineSQLBuilderFactory.java | 2 +-
.../fixture/FixturePipelineSQLBuilder.java | 8 ++-
.../pipeline/mysql/importer/MySQLImporter.java | 9 ---
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 14 ++---
.../pipeline/mysql/importer/MySQLImporterTest.java | 66 ----------------------
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 9 ++-
.../opengauss/importer/OpenGaussImporter.java | 10 ----
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 22 +++-----
.../postgresql/importer/PostgreSQLImporter.java | 10 ----
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 14 ++---
.../importer/PostgreSQLImporterTest.java | 41 --------------
.../PostgreSQLPipelineSQLBuilderTest.java | 14 ++++-
.../core/fixture/FixturePipelineSQLBuilder.java | 10 ----
.../core/importer/AbstractImporterTest.java | 38 +++----------
.../spi/sqlbuilder/PipelineSQLBuilderTest.java | 13 +++--
18 files changed, 75 insertions(+), 265 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 015ee23813b..75f6dd4744d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -19,46 +19,46 @@ package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.spi.type.typed.StatelessTypedSPI;
+import org.apache.shardingsphere.spi.type.typed.StatefulTypedSPI;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
/**
* Pipeline SQL builder.
- * It's singleton when it's used as SPI, else not.
*/
-// TODO refactor to stateless for SPI usage, it's confusing now
-public interface PipelineSQLBuilder extends StatelessTypedSPI {
+public interface PipelineSQLBuilder extends StatefulTypedSPI {
/**
* Build insert SQL.
- * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
*
* @param dataRecord data record
+ * @param shardingColumnsMap sharding columns map
* @return insert SQL
*/
- String buildInsertSQL(DataRecord dataRecord);
+ String buildInsertSQL(DataRecord dataRecord, Map<String, Set<String>> shardingColumnsMap);
/**
* Build update SQL.
*
* @param dataRecord data record
* @param conditionColumns condition columns
+ * @param shardingColumnsMap sharding columns map
* @return update SQL
*/
- String buildUpdateSQL(DataRecord dataRecord, Collection<Column> conditionColumns);
+ String buildUpdateSQL(DataRecord dataRecord, Collection<Column> conditionColumns, Map<String, Set<String>> shardingColumnsMap);
/**
* Extract updated columns.
- * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
*
- * @param columns columns
* @param record data record
+ * @param shardingColumnsMap sharding columns map
* @return filtered columns
*/
- List<Column> extractUpdatedColumns(Collection<Column> columns, DataRecord record);
+ List<Column> extractUpdatedColumns(DataRecord record, Map<String, Set<String>> shardingColumnsMap);
/**
* Build delete SQL.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index 187e2f93d0f..4b1ec0a6efa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -39,7 +40,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -63,17 +63,9 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
this.channel = channel;
- pipelineSqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
+ pipelineSqlBuilder = PipelineSQLBuilderFactory.newInstance(importerConfig.getDataSourceConfig().getDatabaseType().getName());
}
- /**
- * Create SQL builder.
- *
- * @param shardingColumnsMap sharding columns map
- * @return SQL builder
- */
- protected abstract PipelineSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
-
@Override
protected void doStart() {
write();
@@ -158,7 +150,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
}
private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
- String insertSql = pipelineSqlBuilder.buildInsertSQL(dataRecords.get(0));
+ String insertSql = pipelineSqlBuilder.buildInsertSQL(dataRecords.get(0), importerConfig.getShardingColumnsMap());
try (PreparedStatement ps = connection.prepareStatement(insertSql)) {
ps.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
@@ -183,8 +175,8 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
log.error("executeUpdate, could not get shardingColumns, tableName={}, shardingColumnsMap.keySet={}", record.getTableName(), importerConfig.getShardingColumnsMap().keySet());
}
List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, shardingColumns);
- List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record.getColumns(), record);
- String updateSql = pipelineSqlBuilder.buildUpdateSQL(record, conditionColumns);
+ List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record, importerConfig.getShardingColumnsMap());
+ String updateSql = pipelineSqlBuilder.buildUpdateSQL(record, conditionColumns, importerConfig.getShardingColumnsMap());
try (PreparedStatement ps = connection.prepareStatement(updateSql)) {
for (int i = 0; i < updatedColumns.size(); i++) {
ps.setObject(i + 1, updatedColumns.get(i).getValue());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 45f2a6f8188..1e61d39f0d2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -18,8 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
import com.google.common.base.Preconditions;
-import lombok.AccessLevel;
-import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -27,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -47,17 +44,6 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();
- @Getter(AccessLevel.PROTECTED)
- private final Map<String, Set<String>> shardingColumnsMap;
-
- public AbstractPipelineSQLBuilder() {
- shardingColumnsMap = Collections.emptyMap();
- }
-
- public AbstractPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- this.shardingColumnsMap = shardingColumnsMap;
- }
-
/**
* Get left identifier quote string.
*
@@ -83,7 +69,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public String buildInsertSQL(final DataRecord dataRecord) {
+ public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord.getTableName(), dataRecord.getColumns()));
@@ -109,13 +95,13 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
+ public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns, final Map<String, Set<String>> shardingColumnsMap) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
}
StringBuilder updatedColumnString = new StringBuilder();
- for (Column each : extractUpdatedColumns(dataRecord.getColumns(), dataRecord)) {
+ for (Column each : extractUpdatedColumns(dataRecord, shardingColumnsMap)) {
updatedColumnString.append(String.format("%s = ?,", quote(each.getName())));
}
updatedColumnString.setLength(updatedColumnString.length() - 1);
@@ -127,7 +113,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public List<Column> extractUpdatedColumns(final Collection<Column> columns, final DataRecord record) {
+ public List<Column> extractUpdatedColumns(final DataRecord record, final Map<String, Set<String>> shardingColumnsMap) {
return new ArrayList<>(RecordUtil.extractUpdatedColumns(record));
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
index cbf686ac87a..86df193c0e8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
@@ -40,6 +40,6 @@ public final class PipelineSQLBuilderFactory {
* @return new instance of pipeline SQL builder
*/
public static PipelineSQLBuilder newInstance(final String databaseType) {
- return TypedSPIRegistry.getRegisteredService(PipelineSQLBuilder.class, databaseType);
+ return TypedSPIRegistry.getRegisteredService(PipelineSQLBuilder.class, databaseType, null);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index d76a21aaf8e..95fba711111 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -24,22 +24,24 @@ import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
@Override
- public String buildInsertSQL(final DataRecord dataRecord) {
+ public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
return "";
}
@Override
- public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
+ public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns, final Map<String, Set<String>> shardingColumnsMap) {
return "";
}
@Override
- public List<Column> extractUpdatedColumns(final Collection<Column> columns, final DataRecord record) {
+ public List<Column> extractUpdatedColumns(final DataRecord record, final Map<String, Set<String>> shardingColumnsMap) {
return Collections.emptyList();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index 8db8f101394..24185e16a94 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -21,12 +21,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterCo
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
/**
* MySQL importer.
@@ -39,9 +35,4 @@ public final class MySQLImporter extends AbstractImporter {
queryProps.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString());
importerConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
}
-
- @Override
- protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- return new MySQLPipelineSQLBuilder(shardingColumnsMap);
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 22e7e444ba5..f186f8d8876 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
-import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
@@ -29,13 +28,8 @@ import java.util.Set;
/**
* MySQL pipeline SQL builder.
*/
-@NoArgsConstructor
public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- public MySQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- super(shardingColumnsMap);
- }
-
@Override
public String getLeftIdentifierQuoteString() {
return "`";
@@ -47,15 +41,15 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
}
@Override
- public String buildInsertSQL(final DataRecord dataRecord) {
- return super.buildInsertSQL(dataRecord) + buildDuplicateUpdateSQL(dataRecord);
+ public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
+ return super.buildInsertSQL(dataRecord, shardingColumnsMap) + buildDuplicateUpdateSQL(dataRecord, shardingColumnsMap);
}
- private String buildDuplicateUpdateSQL(final DataRecord dataRecord) {
+ private String buildDuplicateUpdateSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
- if (column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
+ if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
continue;
}
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporterTest.java
deleted file mode 100644
index 3bbddd2a4a9..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporterTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class MySQLImporterTest {
-
- @Mock
- private ImporterConfiguration importerConfig;
-
- @Mock
- private PipelineDataSourceManager dataSourceManager;
-
- @Mock
- private PipelineChannel channel;
-
- @Test
- public void assertCreateSqlBuilder() {
- when(importerConfig.getDataSourceConfig()).thenReturn(mock(PipelineDataSourceConfiguration.class));
- MySQLImporter mysqlImporter = new MySQLImporter(importerConfig, dataSourceManager, channel);
- String insertSQL = mysqlImporter.createSQLBuilder(Collections.emptyMap()).buildInsertSQL(mockDataRecord());
- assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`) VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=VALUES(`name`)"));
- }
-
- private DataRecord mockDataRecord() {
- DataRecord result = new DataRecord(new BinlogPosition("binlog-000001", 4), 2);
- result.setTableName("t_order");
- result.addColumn(new Column("id", 1, true, true));
- result.addColumn(new Column("name", "", true, false));
- return result;
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index a8594a30119..8db2aca7cfb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.junit.Test;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -33,17 +34,19 @@ import static org.junit.Assert.assertTrue;
public final class MySQLPipelineSQLBuilderTest {
- private final MySQLPipelineSQLBuilder sqlBuilder = new MySQLPipelineSQLBuilder(ImmutableMap.<String, Set<String>>builder().put("t2", Collections.singleton("sc")).build());
+ private final MySQLPipelineSQLBuilder sqlBuilder = new MySQLPipelineSQLBuilder();
+
+ private final Map<String, Set<String>> shardingColumnsMap = ImmutableMap.<String, Set<String>>builder().put("t2", Collections.singleton("sc")).build();
@Test
public void assertBuildInsertSQL() {
- String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+ String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"), shardingColumnsMap);
assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `sc`=VALUES(`sc`),`c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
}
@Test
public void assertBuildInsertSQLHasShardingColumn() {
- String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t2"));
+ String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t2"), shardingColumnsMap);
assertThat(actual, is("INSERT INTO `t2`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
index 03a5c855b32..6b86b38ad08 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
@@ -21,11 +21,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterCo
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
-import org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder.OpenGaussPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-
-import java.util.Map;
-import java.util.Set;
/**
* Importer of openGauss.
@@ -35,9 +30,4 @@ public final class OpenGaussImporter extends AbstractImporter {
public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
super(importerConfig, dataSourceManager, channel);
}
-
- @Override
- protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- return new OpenGaussPipelineSQLBuilder(shardingColumnsMap);
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 281cc261b28..4ce83a09121 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -17,28 +17,20 @@
package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
-import com.google.common.collect.Collections2;
-import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Pipeline SQL builder of openGauss.
*/
-@NoArgsConstructor
public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- public OpenGaussPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- super(shardingColumnsMap);
- }
-
@Override
public String getLeftIdentifierQuoteString() {
return "";
@@ -50,17 +42,17 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
}
@Override
- public String buildInsertSQL(final DataRecord dataRecord) {
- return super.buildInsertSQL(dataRecord) + buildConflictSQL();
+ public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
+ return super.buildInsertSQL(dataRecord, shardingColumnsMap) + buildConflictSQL(shardingColumnsMap);
}
@Override
- public List<Column> extractUpdatedColumns(final Collection<Column> columns, final DataRecord record) {
- return new ArrayList<>(Collections2.filter(columns, column -> !(column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), record.getTableName(), column.getName()))));
+ public List<Column> extractUpdatedColumns(final DataRecord record, final Map<String, Set<String>> shardingColumnsMap) {
+ return record.getColumns().stream().filter(each -> !(each.isPrimaryKey() || isShardingColumn(shardingColumnsMap, record.getTableName(), each.getName()))).collect(Collectors.toList());
}
- private String buildConflictSQL() {
- // there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
+ private String buildConflictSQL(final Map<String, Set<String>> shardingColumnsMap) {
+ // TODO there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
return "";
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
index 809424a9b7b..f044cba5690 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
@@ -21,11 +21,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterCo
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-
-import java.util.Map;
-import java.util.Set;
/**
* PostgreSQL importer.
@@ -35,9 +30,4 @@ public final class PostgreSQLImporter extends AbstractImporter {
public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
super(importerConfig, dataSourceManager, channel);
}
-
- @Override
- protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- return new PostgreSQLPipelineSQLBuilder(shardingColumnsMap);
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index cb1cdda89fe..0a37927d29a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
-import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -29,13 +28,8 @@ import java.util.Set;
/**
* PostgreSQL pipeline SQL builder.
*/
-@NoArgsConstructor
public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- public PostgreSQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- super(shardingColumnsMap);
- }
-
@Override
public String getLeftIdentifierQuoteString() {
return "\"";
@@ -47,12 +41,12 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
}
@Override
- public String buildInsertSQL(final DataRecord dataRecord) {
- return super.buildInsertSQL(dataRecord) + buildConflictSQL(dataRecord);
+ public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
+ return super.buildInsertSQL(dataRecord, shardingColumnsMap) + buildConflictSQL(dataRecord, shardingColumnsMap);
}
// Refer to https://www.postgresql.org/docs/current/sql-insert.html
- private String buildConflictSQL(final DataRecord dataRecord) {
+ private String buildConflictSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
StringBuilder result = new StringBuilder(" ON CONFLICT (");
for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
result.append(each.getName()).append(",");
@@ -61,7 +55,7 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
result.append(") DO UPDATE SET ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
- if (column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
+ if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
continue;
}
result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
deleted file mode 100644
index edb2cbe4769..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.postgresql.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLImporterTest {
-
- @Test
- public void assertCreateSQLBuilder() {
- ImporterConfiguration importerConfig = mock(ImporterConfiguration.class);
- PipelineSQLBuilder actual = new PostgreSQLImporter(importerConfig, null, null).createSQLBuilder(Collections.emptyMap());
- assertTrue(actual instanceof PostgreSQLPipelineSQLBuilder);
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index e87046ab9f8..be560f9ca76 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -17,24 +17,32 @@
package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.junit.Test;
import org.postgresql.replication.LogSequenceNumber;
+import java.util.Map;
+import java.util.Set;
+
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class PostgreSQLPipelineSQLBuilderTest {
+ private final PostgreSQLPipelineSQLBuilder sqlBuilder = new PostgreSQLPipelineSQLBuilder();
+
+ private final Map<String, Set<String>> shardingColumnsMap = ImmutableMap.<String, Set<String>>builder().put("t_order", Sets.newHashSet("order_id", "user_id")).build();
+
@Test
public void assertBuildInsertSQL() {
- String actual = PipelineSQLBuilderFactory.newInstance("PostgreSQL").buildInsertSQL(mockDataRecord());
+ String actual = sqlBuilder.buildInsertSQL(mockDataRecord(), shardingColumnsMap);
assertThat(actual, is("INSERT INTO \"t_order\"(\"order_id\",\"user_id\",\"status\") VALUES(?,?,?) ON CONFLICT (order_id)"
- + " DO UPDATE SET \"user_id\"=EXCLUDED.\"user_id\",\"status\"=EXCLUDED.\"status\""));
+ + " DO UPDATE SET \"status\"=EXCLUDED.\"status\""));
}
private DataRecord mockDataRecord() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 70fd92081e2..fb6f5d92440 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -19,18 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
-import java.util.Map;
-import java.util.Set;
-
public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- public FixturePipelineSQLBuilder() {
- }
-
- public FixturePipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- super(shardingColumnsMap);
- }
-
@Override
protected String getLeftIdentifierQuoteString() {
return "`";
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 979fa005188..db7a53dd4b4 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -18,6 +18,9 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
@@ -25,10 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -57,20 +57,11 @@ public final class AbstractImporterTest {
private static final String TABLE_NAME = "test_table";
- private static final String INSERT_SQL = "INSERT INTO test_table (id,user,status) VALUES(?,?,?)";
-
- private static final String DELETE_SQL = "DELETE FROM test_table WHERE id = ? and user = ?";
-
- private static final String UPDATE_SQL = "UPDATE test_table SET user = ?,status = ? WHERE id = ? and user = ?";
-
@Mock
private PipelineDataSourceManager dataSourceManager;
- @Mock
- private PipelineSQLBuilder pipelineSqlBuilder;
-
- @Mock
- private PipelineDataSourceConfiguration dataSourceConfig;
+ private final PipelineDataSourceConfiguration dataSourceConfig = new StandardPipelineDataSourceConfiguration(
+ "jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root", "root", "root");
@Mock
private PipelineChannel channel;
@@ -89,11 +80,6 @@ public final class AbstractImporterTest {
@Before
public void setUp() throws SQLException {
jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel) {
-
- @Override
- protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- return pipelineSqlBuilder;
- }
};
when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
when(dataSource.getConnection()).thenReturn(connection);
@@ -102,8 +88,7 @@ public final class AbstractImporterTest {
@Test
public void assertWriteInsertDataRecord() throws SQLException {
DataRecord insertRecord = getDataRecord("INSERT");
- when(pipelineSqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
- when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
+ when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(insertRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
@@ -115,8 +100,7 @@ public final class AbstractImporterTest {
@Test
public void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord = getDataRecord("DELETE");
- when(pipelineSqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
- when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
+ when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(deleteRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
@@ -127,10 +111,8 @@ public final class AbstractImporterTest {
@Test
public void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord = getDataRecord("UPDATE");
- when(pipelineSqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
- when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
+ when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
- when(pipelineSqlBuilder.extractUpdatedColumns(any(), any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 10);
verify(preparedStatement).setObject(2, "UPDATE");
@@ -142,10 +124,8 @@ public final class AbstractImporterTest {
@Test
public void assertUpdatePrimaryKeyDataRecord() throws SQLException {
DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
- when(pipelineSqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
- when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
+ when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
- when(pipelineSqlBuilder.extractUpdatedColumns(any(), any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
jdbcImporter.run();
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
index e93f4d27f2e..db519865515 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
+import com.google.common.collect.ImmutableMap;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -26,6 +27,8 @@ import org.junit.Test;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -34,22 +37,24 @@ public final class PipelineSQLBuilderTest {
private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder();
+ private final Map<String, Set<String>> shardingColumnsMap = ImmutableMap.<String, Set<String>>builder().put("t2", Collections.singleton("sc")).build();
+
@Test
public void assertBuildInsertSQL() {
- String actual = pipelineSQLBuilder.buildInsertSQL(mockDataRecord("t1"));
- assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
+ String actual = pipelineSQLBuilder.buildInsertSQL(mockDataRecord("t2"), shardingColumnsMap);
+ assertThat(actual, is("INSERT INTO `t2`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
}
@Test
public void assertBuildUpdateSQLWithPrimaryKey() {
- String actual = pipelineSQLBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
+ String actual = pipelineSQLBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")), shardingColumnsMap);
assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
}
@Test
public void assertBuildUpdateSQLWithShardingColumns() {
DataRecord dataRecord = mockDataRecord("t2");
- String actual = pipelineSQLBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord));
+ String actual = pipelineSQLBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord), shardingColumnsMap);
assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
}