You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/11 06:20:18 UTC
[shardingsphere] branch master updated: For optimization scaling
importer performance (#8127)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e6f3153 For optimization scaling importer performance (#8127)
e6f3153 is described below
commit e6f3153f45a53e74daa82a00b5ac8d81e19a8986
Author: avalon5666 <64...@users.noreply.github.com>
AuthorDate: Wed Nov 11 14:08:20 2020 +0800
For optimization scaling importer performance (#8127)
* Revert "Use insert on duplicate key update in mysql insert (#8004)"
This reverts commit 7a2b03c48d0b8c5d967df1d43607484ecd49a6e2.
* Use insert on duplicate key update in mysql insert (#8004)
* Support update primary key column
---
.../executor/importer/AbstractJDBCImporter.java | 44 ++++++++----
.../executor/importer/AbstractSQLBuilder.java | 82 +++++++++-------------
.../execute/executor/importer/PreparedSQL.java | 39 ----------
.../core/execute/executor/record/Column.java | 9 +++
.../core/execute/executor/record/DataRecord.java | 6 +-
.../core/execute/executor/record/RecordUtil.java | 55 ++++++++-------
.../importer/AbstractJDBCImporterTest.java | 47 +++++++++++--
.../executor/importer/AbstractSqlBuilderTest.java | 74 +++++--------------
.../fixture/FixtureDataConsistencyChecker.java | 3 +-
.../scaling/mysql/MySQLBinlogDumper.java | 5 +-
.../scaling/mysql/MySQLDataConsistencyChecker.java | 3 +-
.../scaling/mysql/MySQLImporter.java | 7 +-
.../scaling/mysql/MySQLSQLBuilder.java | 27 +++----
.../scaling/mysql/MySQLImporterTest.java | 8 +--
...qlBuilderTest.java => MySQLSQLBuilderTest.java} | 29 ++++----
.../PostgreSQLDataConsistencyChecker.java | 3 +-
.../scaling/postgresql/PostgreSQLImporter.java | 7 +-
.../scaling/postgresql/PostgreSQLSQLBuilder.java | 20 ++----
.../scaling/postgresql/PostgreSQLImporterTest.java | 8 +--
.../postgresql/PostgreSQLSqlBuilderTest.java | 8 +--
20 files changed, 210 insertions(+), 274 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index e36eabc..3ca1ba7 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -25,9 +25,11 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import javax.sql.DataSource;
@@ -37,8 +39,6 @@ import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
/**
* Abstract JDBC importer implementation.
@@ -58,7 +58,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
protected AbstractJDBCImporter(final ImporterConfiguration importerConfig, final DataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
- sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
+ sqlBuilder = createSQLBuilder();
}
/**
@@ -66,7 +66,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
*
* @return SQL builder
*/
- protected abstract AbstractSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
+ protected abstract AbstractSQLBuilder createSQLBuilder();
@Override
public final void start() {
@@ -140,25 +140,41 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
+ String insertSql = sqlBuilder.buildInsertSQL(record);
+ PreparedStatement ps = connection.prepareStatement(insertSql);
+ ps.setQueryTimeout(30);
try {
- executeSQL(connection, record, sqlBuilder.buildInsertSQL(record));
+ for (int i = 0; i < record.getColumnCount(); i++) {
+ ps.setObject(i + 1, record.getColumn(i).getValue());
+ }
+ ps.execute();
} catch (final SQLIntegrityConstraintViolationException ignored) {
}
}
private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
- executeSQL(connection, record, sqlBuilder.buildUpdateSQL(record));
+ List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
+ List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
+ String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
+ PreparedStatement ps = connection.prepareStatement(updateSql);
+ for (int i = 0; i < updatedColumns.size(); i++) {
+ ps.setObject(i + 1, updatedColumns.get(i).getValue());
+ }
+ for (int i = 0; i < conditionColumns.size(); i++) {
+ Column keyColumn = conditionColumns.get(i);
+ ps.setObject(updatedColumns.size() + i + 1,
+ // sharding column can not be updated
+ (keyColumn.isPrimaryKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
+ }
+ ps.execute();
}
private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
- executeSQL(connection, record, sqlBuilder.buildDeleteSQL(record));
- }
-
- private void executeSQL(final Connection connection, final DataRecord record, final PreparedSQL preparedSQL) throws SQLException {
- PreparedStatement ps = connection.prepareStatement(preparedSQL.getSql());
- for (int i = 0; i < preparedSQL.getValuesIndex().size(); i++) {
- int columnIndex = preparedSQL.getValuesIndex().get(i);
- ps.setObject(i + 1, record.getColumn(columnIndex).getValue());
+ List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
+ String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
+ PreparedStatement ps = connection.prepareStatement(deleteSql);
+ for (int i = 0; i < conditionColumns.size(); i++) {
+ ps.setObject(i + 1, conditionColumns.get(i).getValue());
}
ps.execute();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
index fc191df..e102b05 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
@@ -17,21 +17,18 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
-import lombok.RequiredArgsConstructor;
+import com.google.common.collect.Collections2;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Abstract SQL builder.
*/
-@RequiredArgsConstructor
public abstract class AbstractSQLBuilder {
private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
@@ -40,9 +37,7 @@ public abstract class AbstractSQLBuilder {
private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
- private final Map<String, Set<String>> shardingColumnsMap;
-
- private final ConcurrentMap<String, PreparedSQL> sqlCacheMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();
/**
* Get left identifier quote string.
@@ -72,90 +67,79 @@ public abstract class AbstractSQLBuilder {
* Build insert SQL.
*
* @param dataRecord data record
- * @return insert prepared SQL
+ * @return insert SQL
*/
- public PreparedSQL buildInsertSQL(final DataRecord dataRecord) {
+ public String buildInsertSQL(final DataRecord dataRecord) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord));
+ sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord.getTableName(), dataRecord.getColumns()));
}
return sqlCacheMap.get(sqlCacheKey);
}
- protected PreparedSQL buildInsertSQLInternal(final DataRecord dataRecord) {
+ private String buildInsertSQLInternal(final String tableName, final List<Column> columns) {
StringBuilder columnsLiteral = new StringBuilder();
StringBuilder holder = new StringBuilder();
- List<Integer> valuesIndex = new ArrayList<>();
- for (int i = 0; i < dataRecord.getColumnCount(); i++) {
- columnsLiteral.append(String.format("%s,", quote(dataRecord.getColumn(i).getName())));
+ for (Column each : columns) {
+ columnsLiteral.append(String.format("%s,", quote(each.getName())));
holder.append("?,");
- valuesIndex.add(i);
}
columnsLiteral.setLength(columnsLiteral.length() - 1);
holder.setLength(holder.length() - 1);
- return new PreparedSQL(
- String.format("INSERT INTO %s(%s) VALUES(%s)", quote(dataRecord.getTableName()), columnsLiteral, holder),
- valuesIndex);
+ return String.format("INSERT INTO %s(%s) VALUES(%s)", quote(tableName), columnsLiteral, holder);
}
/**
* Build update SQL.
*
* @param dataRecord data record
- * @return update prepared SQL
+ * @param conditionColumns condition columns
+ * @return update SQL
*/
- public PreparedSQL buildUpdateSQL(final DataRecord dataRecord) {
+ public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord));
+ sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
}
StringBuilder updatedColumnString = new StringBuilder();
- List<Integer> valuesIndex = new ArrayList<>();
- for (Integer each : RecordUtil.extractUpdatedColumns(dataRecord)) {
- updatedColumnString.append(String.format("%s = ?,", quote(dataRecord.getColumn(each).getName())));
- valuesIndex.add(each);
+ for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
+ updatedColumnString.append(String.format("%s = ?,", quote(each.getName())));
}
updatedColumnString.setLength(updatedColumnString.length() - 1);
- PreparedSQL preparedSQL = sqlCacheMap.get(sqlCacheKey);
- valuesIndex.addAll(preparedSQL.getValuesIndex());
- return new PreparedSQL(
- String.format(preparedSQL.getSql(), updatedColumnString),
- valuesIndex);
+ return String.format(sqlCacheMap.get(sqlCacheKey), updatedColumnString);
+ }
+
+ private String buildUpdateSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+ return String.format("UPDATE %s SET %%s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
}
- private PreparedSQL buildUpdateSQLInternal(final DataRecord dataRecord) {
- List<Integer> valuesIndex = new ArrayList<>();
- return new PreparedSQL(
- String.format("UPDATE %s SET %%s WHERE %s", quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, valuesIndex)),
- valuesIndex);
+ private Collection<Column> extractUpdatedColumns(final Collection<Column> columns) {
+ return Collections2.filter(columns, Column::isUpdated);
}
/**
* Build delete SQL.
*
* @param dataRecord data record
- * @return delete prepared SQL
+ * @param conditionColumns condition columns
+ * @return delete SQL
*/
- public PreparedSQL buildDeleteSQL(final DataRecord dataRecord) {
+ public String buildDeleteSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord));
+ sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord.getTableName(), conditionColumns));
}
return sqlCacheMap.get(sqlCacheKey);
}
- private PreparedSQL buildDeleteSQLInternal(final DataRecord dataRecord) {
- List<Integer> columnsIndex = new ArrayList<>();
- return new PreparedSQL(
- String.format("DELETE FROM %s WHERE %s", quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, columnsIndex)),
- columnsIndex);
+ private String buildDeleteSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+ return String.format("DELETE FROM %s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
}
- private String buildWhereSQL(final DataRecord dataRecord, final List<Integer> valuesIndex) {
+ private String buildWhereSQL(final Collection<Column> conditionColumns) {
StringBuilder where = new StringBuilder();
- for (Integer each : RecordUtil.extractConditionColumns(dataRecord, shardingColumnsMap.get(dataRecord.getTableName()))) {
- where.append(String.format("%s = ? and ", quote(dataRecord.getColumn(each).getName())));
- valuesIndex.add(each);
+ for (Column each : conditionColumns) {
+ where.append(String.format("%s = ? and ", quote(each.getName())));
}
where.setLength(where.length() - 5);
return where.toString();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java
deleted file mode 100644
index fe07e10..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.execute.executor.importer;
-
-import lombok.Getter;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Prepared SQL, include complete sql and complete values index list.
- */
-@Getter
-public class PreparedSQL {
-
- private final String sql;
-
- private final List<Integer> valuesIndex;
-
- public PreparedSQL(final String sql, final List<Integer> valuesIndex) {
- this.sql = sql;
- this.valuesIndex = Collections.unmodifiableList(valuesIndex);
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java
index 89d80b7..1c66a9f 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java
@@ -29,12 +29,21 @@ public final class Column {
private final String name;
+ /**
+ * Value are available only when the primary key column is updated.
+ */
+ private final Object oldValue;
+
private final Object value;
private final boolean updated;
private final boolean primaryKey;
+ public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
+ this(name, null, value, updated, primaryKey);
+ }
+
@Override
public String toString() {
return String.format("%s=%s", name, value);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
index ffb88cd..e8594c0 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
@@ -31,6 +31,8 @@ import java.util.List;
/**
* Data record.
*/
+@Setter
+@Getter
@EqualsAndHashCode(of = {"tableName", "primaryKeyValue"}, callSuper = false)
@ToString
public final class DataRecord extends Record {
@@ -39,12 +41,8 @@ public final class DataRecord extends Record {
private final List<Object> primaryKeyValue = new LinkedList<>();
- @Setter
- @Getter
private String type;
- @Setter
- @Getter
private String tableName;
public DataRecord(final Position position, final int columnCount) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
index 0a72b08..d9e38bb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
@@ -20,10 +20,9 @@ package org.apache.shardingsphere.scaling.core.execute.executor.record;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* Record utility.
@@ -32,45 +31,51 @@ import java.util.stream.IntStream;
public final class RecordUtil {
/**
- * Extract primary columns index from data record.
+ * Extract primary columns from data record.
*
* @param dataRecord data record
- * @return primary columns index
+ * @return primary columns
*/
- public static List<Integer> extractPrimaryColumns(final DataRecord dataRecord) {
- return IntStream.range(0, dataRecord.getColumnCount())
- .filter(each -> dataRecord.getColumn(each).isPrimaryKey())
- .mapToObj(each -> each)
- .collect(Collectors.toList());
+ public static List<Column> extractPrimaryColumns(final DataRecord dataRecord) {
+ List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
+ for (Column each : dataRecord.getColumns()) {
+ if (each.isPrimaryKey()) {
+ result.add(each);
+ }
+ }
+ return result;
}
/**
- * Extract condition columns(include primary and sharding columns) index from data record.
+ * Extract condition columns(include primary and sharding columns) from data record.
*
* @param dataRecord data record
* @param shardingColumns sharding columns
- * @return condition columns index
+ * @return condition columns
*/
- public static List<Integer> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
- return IntStream.range(0, dataRecord.getColumnCount())
- .filter(each -> {
- Column column = dataRecord.getColumn(each);
- return column.isPrimaryKey() || shardingColumns.contains(column.getName());
- })
- .mapToObj(each -> each)
- .collect(Collectors.toList());
+ public static List<Column> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
+ List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
+ for (Column each : dataRecord.getColumns()) {
+ if (each.isPrimaryKey() || shardingColumns.contains(each.getName())) {
+ result.add(each);
+ }
+ }
+ return result;
}
/**
* Extract updated columns from data record.
*
* @param dataRecord data record
- * @return updated columns index
+ * @return updated columns
*/
- public static List<Integer> extractUpdatedColumns(final DataRecord dataRecord) {
- return IntStream.range(0, dataRecord.getColumnCount())
- .filter(each -> dataRecord.getColumn(each).isUpdated())
- .mapToObj(each -> each)
- .collect(Collectors.toList());
+ public static List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+ List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
+ for (Column each : dataRecord.getColumns()) {
+ if (each.isUpdated()) {
+ result.add(each);
+ }
+ }
+ return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 3283dd0..637131d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -17,21 +17,22 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@@ -39,11 +40,13 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -86,7 +89,7 @@ public final class AbstractJDBCImporterTest {
jdbcImporter = new AbstractJDBCImporter(getImporterConfiguration(), dataSourceManager) {
@Override
- protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+ protected AbstractSQLBuilder createSQLBuilder() {
return sqlBuilder;
}
};
@@ -96,9 +99,9 @@ public final class AbstractJDBCImporterTest {
}
@Test
- public void assertInsertDataRecord() throws SQLException {
+ public void assertWriteInsertDataRecord() throws SQLException {
DataRecord insertRecord = getDataRecord("INSERT");
- when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(new PreparedSQL(INSERT_SQL, Lists.newArrayList(0, 1, 2)));
+ when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(insertRecord));
jdbcImporter.run();
@@ -111,7 +114,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord = getDataRecord("DELETE");
- when(sqlBuilder.buildDeleteSQL(deleteRecord)).thenReturn(new PreparedSQL(DELETE_SQL, Lists.newArrayList(0, 1)));
+ when(sqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(deleteRecord));
jdbcImporter.run();
@@ -123,7 +126,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord = getDataRecord("UPDATE");
- when(sqlBuilder.buildUpdateSQL(updateRecord)).thenReturn(new PreparedSQL(UPDATE_SQL, Lists.newArrayList(1, 2, 0, 1)));
+ when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
@@ -134,6 +137,36 @@ public final class AbstractJDBCImporterTest {
verify(preparedStatement).execute();
}
+ @Test
+ public void assertUpdatePrimaryKeyDataRecord() throws SQLException {
+ DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
+ when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
+ when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
+ when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
+ jdbcImporter.run();
+ InOrder inOrder = inOrder(preparedStatement);
+ inOrder.verify(preparedStatement).setObject(1, 2);
+ inOrder.verify(preparedStatement).setObject(2, 10);
+ inOrder.verify(preparedStatement).setObject(3, "UPDATE");
+ inOrder.verify(preparedStatement).setObject(4, 1);
+ inOrder.verify(preparedStatement).setObject(5, 10);
+ inOrder.verify(preparedStatement).execute();
+ }
+
+ private DataRecord getUpdatePrimaryKeyDataRecord() {
+ DataRecord result = new DataRecord(new NopPosition(), 3);
+ result.setTableName(TABLE_NAME);
+ result.setType("UPDATE");
+ result.addColumn(new Column("id", 1, 2, true, true));
+ result.addColumn(new Column("user", 10, true, false));
+ result.addColumn(new Column("status", "UPDATE", true, false));
+ return result;
+ }
+
+ private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+ return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("user"));
+ }
+
private List<Record> mockRecords(final DataRecord dataRecord) {
List<Record> result = new LinkedList<>();
result.add(dataRecord);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index 4cc12e4..c61a481 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -20,37 +20,29 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Sets;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
-import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import java.util.Map;
+import java.util.Collection;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
public class AbstractSqlBuilderTest {
- @Mock
- private Map shardingColumnsMap;
-
private AbstractSQLBuilder sqlBuilder;
@Before
public void setUp() {
- sqlBuilder = new AbstractSQLBuilder(shardingColumnsMap) {
+ sqlBuilder = new AbstractSQLBuilder() {
@Override
protected String getLeftIdentifierQuoteString() {
return "`";
}
-
+
@Override
protected String getRightIdentifierQuoteString() {
return "`";
@@ -60,66 +52,38 @@ public class AbstractSqlBuilderTest {
@Test
public void assertBuildInsertSQL() {
- PreparedSQL actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
- assertThat(actual.getSql(), is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1, 2, 3, 4));
+ String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+ assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
}
@Test
public void assertBuildUpdateSQLWithPrimaryKey() {
- when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet());
- PreparedSQL actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"));
- assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 3, 4, 0));
+ String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
+ assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
}
@Test
public void assertBuildUpdateSQLWithShardingColumns() {
- when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet("sc"));
DataRecord dataRecord = mockDataRecord("t2");
- PreparedSQL actual = sqlBuilder.buildUpdateSQL(dataRecord);
- assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 3, 4, 0, 1));
- }
-
- @Test
- public void assertBuildUpdateSQLWithShardingColumnsUseCache() {
- when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet("sc"));
- DataRecord dataRecord = mockDataRecord("t2");
- PreparedSQL actual = sqlBuilder.buildUpdateSQL(dataRecord);
- assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 3, 4, 0, 1));
- actual = sqlBuilder.buildUpdateSQL(mockDataRecord2("t2"));
- assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 4, 0, 1));
- }
-
- private DataRecord mockDataRecord2(final String tableName) {
- DataRecord result = new DataRecord(new NopPosition(), 4);
- result.setTableName(tableName);
- result.addColumn(new Column("id", "", false, true));
- result.addColumn(new Column("sc", "", false, false));
- result.addColumn(new Column("c1", "", true, false));
- result.addColumn(new Column("c2", "", false, false));
- result.addColumn(new Column("c3", "", true, false));
- return result;
+ String actual = sqlBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord));
+ assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
}
@Test
public void assertBuildDeleteSQLWithPrimaryKey() {
- when(shardingColumnsMap.get("t3")).thenReturn(Sets.newHashSet());
- PreparedSQL actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"));
- assertThat(actual.getSql(), is("DELETE FROM `t3` WHERE `id` = ?"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0));
+ String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"), RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
+ assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ?"));
}
@Test
- public void assertBuildDeleteSQLWithShardingColumns() {
- when(shardingColumnsMap.get("t3")).thenReturn(Sets.newHashSet("sc"));
+ public void assertBuildDeleteSQLWithConditionColumns() {
DataRecord dataRecord = mockDataRecord("t3");
- PreparedSQL actual = sqlBuilder.buildDeleteSQL(dataRecord);
- assertThat(actual.getSql(), is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1));
+ String actual = sqlBuilder.buildDeleteSQL(dataRecord, mockConditionColumns(dataRecord));
+ assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
+ }
+
+ private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+ return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("sc"));
}
private DataRecord mockDataRecord(final String tableName) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
index 1c13d4e..f5f315e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.fixture;
-import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
@@ -45,7 +44,7 @@ public final class FixtureDataConsistencyChecker extends AbstractDataConsistency
@Override
protected AbstractSQLBuilder getSqlBuilder() {
- return new AbstractSQLBuilder(Maps.newHashMap()) {
+ return new AbstractSQLBuilder() {
@Override
protected String getLeftIdentifierQuoteString() {
return "`";
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index b9eb885..1bb170c 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -138,7 +138,10 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
for (int j = 0; j < beforeValues.length; j++) {
Object oldValue = beforeValues[j];
Object newValue = afterValues[j];
- record.addColumn(new Column(tableMetaData.getColumnMetaData(j).getName(), newValue, !Objects.equals(newValue, oldValue), tableMetaData.isPrimaryKey(j)));
+ boolean updated = !Objects.equals(newValue, oldValue);
+ record.addColumn(new Column(tableMetaData.getColumnMetaData(j).getName(),
+ (tableMetaData.isPrimaryKey(j) && updated) ? oldValue : null,
+ newValue, updated, tableMetaData.isPrimaryKey(j)));
}
pushRecord(record);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
index fb44038..41f9e61 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.mysql;
-import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
@@ -99,6 +98,6 @@ public final class MySQLDataConsistencyChecker extends AbstractDataConsistencyCh
@Override
protected MySQLSQLBuilder getSqlBuilder() {
- return new MySQLSQLBuilder(Maps.newHashMap());
+ return new MySQLSQLBuilder();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
index 97a971a..a22f5ac 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
@@ -22,9 +22,6 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
-import java.util.Map;
-import java.util.Set;
-
/**
* MySQL importer.
*/
@@ -35,7 +32,7 @@ public final class MySQLImporter extends AbstractJDBCImporter {
}
@Override
- protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- return new MySQLSQLBuilder(shardingColumnsMap);
+ protected AbstractSQLBuilder createSQLBuilder() {
+ return new MySQLSQLBuilder();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
index 5da75de..b7cd578 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
@@ -18,24 +18,14 @@
package org.apache.shardingsphere.scaling.mysql;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* MySQL SQL builder.
*/
public final class MySQLSQLBuilder extends AbstractSQLBuilder {
- public MySQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- super(shardingColumnsMap);
- }
-
@Override
public String getLeftIdentifierQuoteString() {
return "`";
@@ -47,19 +37,20 @@ public final class MySQLSQLBuilder extends AbstractSQLBuilder {
}
@Override
- protected PreparedSQL buildInsertSQLInternal(final DataRecord dataRecord) {
- PreparedSQL preparedSQL = super.buildInsertSQLInternal(dataRecord);
- StringBuilder insertSQL = new StringBuilder(preparedSQL.getSql() + " ON DUPLICATE KEY UPDATE ");
- List<Integer> valuesIndex = new ArrayList<>(preparedSQL.getValuesIndex());
+ public String buildInsertSQL(final DataRecord dataRecord) {
+ return super.buildInsertSQL(dataRecord) + buildDuplicateUpdateSQL(dataRecord);
+ }
+
+ private String buildDuplicateUpdateSQL(final DataRecord dataRecord) {
+ StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
if (!dataRecord.getColumn(i).isPrimaryKey()) {
- insertSQL.append(quote(column.getName())).append("=?,");
- valuesIndex.add(i);
+ result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
}
}
- insertSQL.setLength(insertSQL.length() - 1);
- return new PreparedSQL(insertSQL.toString(), valuesIndex);
+ result.setLength(result.length() - 1);
+ return result.toString();
}
/**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
index 8b89a63..b1e950d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
@@ -17,14 +17,11 @@
package org.apache.shardingsphere.scaling.mysql;
-import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
-import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -45,9 +42,8 @@ public final class MySQLImporterTest {
@Test
public void assertCreateSqlBuilder() {
MySQLImporter mySQLImporter = new MySQLImporter(importerConfig, dataSourceManager);
- PreparedSQL insertSQL = mySQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
- assertThat(insertSQL.getSql(), is("INSERT INTO `t_order`(`id`,`name`) VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=?"));
- assertThat(insertSQL.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1, 1));
+ String insertSQL = mySQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
+ assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`) VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=VALUES(`name`)"));
}
private DataRecord mockDataRecord() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilderTest.java
similarity index 57%
rename from shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilderTest.java
index 12adaa5..e74fe7e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilderTest.java
@@ -17,32 +17,33 @@
package org.apache.shardingsphere.scaling.mysql;
-import com.google.common.collect.Maps;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
+import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
-import org.hamcrest.Matchers;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class MySQLSqlBuilderTest {
+public class MySQLSQLBuilderTest {
+
+ private AbstractSQLBuilder sqlBuilder = new MySQLSQLBuilder();
@Test
public void assertBuildInsertSQL() {
- PreparedSQL actual = new MySQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
- assertThat(actual.getSql(), is("INSERT INTO `t_order`(`id`,`name`,`age`) VALUES(?,?,?) ON DUPLICATE KEY UPDATE `name`=?,`age`=?"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1, 2, 1, 2));
+ String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+ assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `sc`=VALUES(`sc`),`c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
}
- private DataRecord mockDataRecord() {
- DataRecord result = new DataRecord(new BinlogPosition("", 1), 2);
- result.setTableName("t_order");
- result.addColumn(new Column("id", 1, true, true));
- result.addColumn(new Column("name", "", true, false));
- result.addColumn(new Column("age", 1, true, false));
+ private DataRecord mockDataRecord(final String tableName) {
+ DataRecord result = new DataRecord(new NopPosition(), 4);
+ result.setTableName(tableName);
+ result.addColumn(new Column("id", "", false, true));
+ result.addColumn(new Column("sc", "", false, false));
+ result.addColumn(new Column("c1", "", true, false));
+ result.addColumn(new Column("c2", "", true, false));
+ result.addColumn(new Column("c3", "", true, false));
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
index df1d0c3..97f2712 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.postgresql;
-import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
@@ -42,6 +41,6 @@ public final class PostgreSQLDataConsistencyChecker extends AbstractDataConsiste
@Override
protected AbstractSQLBuilder getSqlBuilder() {
- return new PostgreSQLSQLBuilder(Maps.newHashMap());
+ return new PostgreSQLSQLBuilder();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
index 8faa00c..0422d4b 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
@@ -22,9 +22,6 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
-import java.util.Map;
-import java.util.Set;
-
/**
* postgreSQL importer.
*/
@@ -35,8 +32,8 @@ public final class PostgreSQLImporter extends AbstractJDBCImporter {
}
@Override
- protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- return new PostgreSQLSQLBuilder(shardingColumnsMap);
+ protected AbstractSQLBuilder createSQLBuilder() {
+ return new PostgreSQLSQLBuilder();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
index 488a09e..e59fb36 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
@@ -17,23 +17,16 @@
package org.apache.shardingsphere.scaling.postgresql;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-
-import java.util.Map;
-import java.util.Set;
+import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
/**
* PostgreSQL SQL builder.
*/
public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
- public PostgreSQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
- super(shardingColumnsMap);
- }
-
@Override
public String getLeftIdentifierQuoteString() {
return "\"";
@@ -45,15 +38,14 @@ public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
}
@Override
- public PreparedSQL buildInsertSQL(final DataRecord dataRecord) {
- PreparedSQL preparedSQL = super.buildInsertSQL(dataRecord);
- return new PreparedSQL(preparedSQL.getSql() + buildConflictSQL(dataRecord), preparedSQL.getValuesIndex());
+ public String buildInsertSQL(final DataRecord dataRecord) {
+ return super.buildInsertSQL(dataRecord) + buildConflictSQL(dataRecord);
}
private String buildConflictSQL(final DataRecord dataRecord) {
StringBuilder result = new StringBuilder(" ON CONFLICT (");
- for (Integer each : RecordUtil.extractPrimaryColumns(dataRecord)) {
- result.append(dataRecord.getColumn(each).getName()).append(",");
+ for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
+ result.append(each.getName()).append(",");
}
result.setLength(result.length() - 1);
result.append(") DO NOTHING");
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
index 19a1bf9..f7b4e68 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
@@ -17,14 +17,11 @@
package org.apache.shardingsphere.scaling.postgresql;
-import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
-import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -46,9 +43,8 @@ public final class PostgreSQLImporterTest {
@Test
public void assertCreateSQLBuilder() {
PostgreSQLImporter postgreSQLImporter = new PostgreSQLImporter(importerConfig, dataSourceManager);
- PreparedSQL insertSQL = postgreSQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
- assertThat(insertSQL.getSql(), is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
- assertThat(insertSQL.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1));
+ String insertSQL = postgreSQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
+ assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
}
private DataRecord mockDataRecord() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
index 885c5f4..3f14f4c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
@@ -17,12 +17,9 @@
package org.apache.shardingsphere.scaling.postgresql;
-import com.google.common.collect.Maps;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
-import org.hamcrest.Matchers;
import org.junit.Test;
import org.postgresql.replication.LogSequenceNumber;
@@ -33,9 +30,8 @@ public final class PostgreSQLSqlBuilderTest {
@Test
public void assertBuildInsertSQL() {
- PreparedSQL actual = new PostgreSQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
- assertThat(actual.getSql(), is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
- assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1));
+ String actual = new PostgreSQLSQLBuilder().buildInsertSQL(mockDataRecord());
+ assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
}
private DataRecord mockDataRecord() {