You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/06/21 08:46:28 UTC
[shardingsphere] branch master updated: add sharing conditions in
update or delete sql (#6150)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4762a94 add sharing conditions in update or delete sql (#6150)
4762a94 is described below
commit 4762a94fd3ada8f64b37367b64015aa3c548b83b
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Sun Jun 21 16:46:04 2020 +0800
add sharing conditions in update or delete sql (#6150)
* add sharing conditions in update or delete sql
* we don't need all select privilege on *.*, so remove it.
Co-authored-by: Lucas <qi...@jd.com>
---
.../scaling/core/config/RdbmsConfiguration.java | 3 ++
.../core/config/utils/SyncConfigurationUtil.java | 43 +++++++++++++++++++---
.../executor/importer/AbstractJDBCImporter.java | 13 ++++---
.../executor/importer/AbstractSqlBuilder.java | 34 ++++++++---------
.../core/execute/executor/record/RecordUtil.java | 18 +++++++++
.../job/preparer/ShardingScalingJobPreparer.java | 2 +-
.../splitter/InventoryDataTaskSplitter.java | 2 +-
.../importer/AbstractJDBCImporterTest.java | 23 ++++++++++--
.../executor/importer/AbstractSqlBuilderTest.java | 33 ++++++++++++++---
.../scaling/mysql/MySQLDataSourceChecker.java | 4 +-
.../scaling/mysql/MySQLDataSourceCheckerTest.java | 4 +-
11 files changed, 135 insertions(+), 44 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
index 6933eb1..675c1e8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
@@ -23,6 +23,7 @@ import lombok.Setter;
import lombok.SneakyThrows;
import java.util.Map;
+import java.util.Set;
/**
* Relational database management system configuration.
@@ -36,6 +37,8 @@ public final class RdbmsConfiguration implements Cloneable {
private String tableName;
+ private Map<String, Set<String>> shardingColumnsMap;
+
private String whereCondition;
private int spiltNum;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
index 745b471..3f7c5e3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
@@ -17,21 +17,29 @@
package org.apache.shardingsphere.scaling.core.config.utils;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
+import org.apache.shardingsphere.infra.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.infra.config.DataSourceConfiguration;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.ShardingTableRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.strategy.ComplexShardingStrategyConfiguration;
+import org.apache.shardingsphere.sharding.api.config.strategy.ShardingStrategyConfiguration;
+import org.apache.shardingsphere.sharding.api.config.strategy.StandardShardingStrategyConfiguration;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Set;
/**
* Sync configuration Util.
@@ -52,7 +60,7 @@ public final class SyncConfigurationUtil {
Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(sourceRule, sourceDatasource.keySet());
for (String each : dataSourceTableNameMap.keySet()) {
RdbmsConfiguration dumperConfiguration = createDumperConfiguration(sourceDatasource.get(each));
- RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration);
+ RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule);
Map<String, String> tableNameMap = dataSourceTableNameMap.get(each);
result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), tableNameMap, dumperConfiguration, importerConfiguration));
}
@@ -110,13 +118,36 @@ public final class SyncConfigurationUtil {
return result;
}
- private static RdbmsConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration) {
+ private static RdbmsConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration, final ShardingRuleConfiguration shardingRuleConfig) {
RdbmsConfiguration result = new RdbmsConfiguration();
JDBCDataSourceConfiguration importerDataSourceConfiguration = new JDBCDataSourceConfiguration(
scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUrl(),
scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUsername(),
scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getPassword());
result.setDataSourceConfiguration(importerDataSourceConfiguration);
+ result.setShardingColumnsMap(toShardingColumnsMap(shardingRuleConfig));
return result;
}
+
+ private static Map<String, Set<String>> toShardingColumnsMap(final ShardingRuleConfiguration shardingRuleConfig) {
+ Map<String, Set<String>> result = Maps.newConcurrentMap();
+ for (ShardingTableRuleConfiguration each : shardingRuleConfig.getTables()) {
+ Set<String> shardingColumns = Sets.newHashSet();
+ shardingColumns.addAll(extractShardingColumns(each.getDatabaseShardingStrategy()));
+ shardingColumns.addAll(extractShardingColumns(each.getTableShardingStrategy()));
+ result.put(each.getLogicTable(), shardingColumns);
+ }
+ return result;
+ }
+
+ private static Set<String> extractShardingColumns(final ShardingStrategyConfiguration shardingStrategy) {
+ if (shardingStrategy instanceof StandardShardingStrategyConfiguration) {
+ return Sets.newHashSet(((StandardShardingStrategyConfiguration) shardingStrategy).getShardingColumn());
+ }
+ if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
+ return Sets.newHashSet(((ComplexShardingStrategyConfiguration) shardingStrategy).getShardingColumns().split(","));
+ }
+ return Collections.EMPTY_SET;
+ }
+
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index d688785..5cbfbb7 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -131,10 +131,11 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
+ List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
List<Column> values = new ArrayList<>();
values.addAll(RecordUtil.extractUpdatedColumns(record));
- values.addAll(RecordUtil.extractPrimaryColumns(record));
- String updateSql = sqlBuilder.buildUpdateSQL(record);
+ values.addAll(conditionColumns);
+ String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
PreparedStatement ps = connection.prepareStatement(updateSql);
for (int i = 0; i < values.size(); i++) {
ps.setObject(i + 1, values.get(i).getValue());
@@ -143,11 +144,11 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
- String deleteSql = sqlBuilder.buildDeleteSQL(record);
- List<Column> primaryKeys = RecordUtil.extractPrimaryColumns(record);
+ List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
+ String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
PreparedStatement ps = connection.prepareStatement(deleteSql);
- for (int i = 0; i < primaryKeys.size(); i++) {
- ps.setObject(i + 1, primaryKeys.get(i).getValue());
+ for (int i = 0; i < conditionColumns.size(); i++) {
+ ps.setObject(i + 1, conditionColumns.get(i).getValue());
}
ps.execute();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java
index e1d858b..391ab09 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Collections2;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import java.util.Collection;
import java.util.List;
@@ -84,12 +83,13 @@ public abstract class AbstractSqlBuilder {
* Build update SQL.
*
* @param dataRecord data record
+ * @param conditionColumns condition columns
* @return update SQL
*/
- public String buildUpdateSQL(final DataRecord dataRecord) {
+ public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), RecordUtil.extractPrimaryColumns(dataRecord)));
+ sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
}
StringBuilder updatedColumnString = new StringBuilder();
for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
@@ -99,13 +99,8 @@ public abstract class AbstractSqlBuilder {
return String.format(sqlCacheMap.get(sqlCacheKey), updatedColumnString.toString());
}
- private String buildUpdateSQLInternal(final String tableName, final Collection<Column> extractPrimaryColumns) {
- StringBuilder where = new StringBuilder();
- for (Column each : extractPrimaryColumns) {
- where.append(String.format("%s%s%s = ?,", getLeftIdentifierQuoteString(), each.getName(), getRightIdentifierQuoteString()));
- }
- where.setLength(where.length() - 1);
- return String.format("UPDATE %s%s%s SET %%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), where.toString());
+ private String buildUpdateSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+ return String.format("UPDATE %s%s%s SET %%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), buildWhereSQL(conditionColumns));
}
private Collection<Column> extractUpdatedColumns(final Collection<Column> columns) {
@@ -116,22 +111,27 @@ public abstract class AbstractSqlBuilder {
* Build delete SQL.
*
* @param dataRecord data record
+ * @param conditionColumns condition columns
* @return delete SQL
*/
- public String buildDeleteSQL(final DataRecord dataRecord) {
+ public String buildDeleteSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord.getTableName(), RecordUtil.extractPrimaryColumns(dataRecord)));
+ sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord.getTableName(), conditionColumns));
}
return sqlCacheMap.get(sqlCacheKey);
}
- private String buildDeleteSQLInternal(final String tableName, final Collection<Column> primaryColumns) {
+ private String buildDeleteSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+ return String.format("DELETE FROM %s%s%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), buildWhereSQL(conditionColumns));
+ }
+
+ private String buildWhereSQL(final Collection<Column> conditionColumns) {
StringBuilder where = new StringBuilder();
- for (Column each : primaryColumns) {
- where.append(String.format("%s%s%s = ?,", getLeftIdentifierQuoteString(), each.getName(), getRightIdentifierQuoteString()));
+ for (Column each : conditionColumns) {
+ where.append(String.format("%s%s%s = ? and ", getLeftIdentifierQuoteString(), each.getName(), getRightIdentifierQuoteString()));
}
- where.setLength(where.length() - 1);
- return String.format("DELETE FROM %s%s%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), where.toString());
+ where.setLength(where.length() - 5);
+ return where.toString();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
index 7f9ac78..9c39c7b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.record;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
/**
* Record util.
@@ -42,6 +43,23 @@ public final class RecordUtil {
}
/**
+ * Extract condition columns(include primary and sharding columns) from data record.
+ *
+ * @param dataRecord data record
+ * @param shardingColumns sharding columns
+ * @return condition columns
+ */
+ public static List<Column> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
+ List<Column> result = new ArrayList<>();
+ for (Column each : dataRecord.getColumns()) {
+ if (each.isPrimaryKey() || shardingColumns.contains(each.getName())) {
+ result.add(each);
+ }
+ }
+ return result;
+ }
+
+ /**
* Extract updated columns from data record.
*
* @param dataRecord data record
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 88704ff..fd0f55a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -58,8 +58,8 @@ public final class ShardingScalingJobPreparer {
String databaseType = shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration().getDatabaseType().getName();
try (DataSourceManager dataSourceManager = new DataSourceManager(shardingScalingJob.getSyncConfigurations())) {
checkDatasources(databaseType, dataSourceManager);
- splitInventoryDataTasks(shardingScalingJob, dataSourceManager);
initIncrementalDataTasks(databaseType, shardingScalingJob, dataSourceManager);
+ splitInventoryDataTasks(shardingScalingJob, dataSourceManager);
} catch (PrepareFailedException ex) {
log.warn("Preparing sharding scaling job {} : {} failed", shardingScalingJob.getJobId(), shardingScalingJob.getJobName(), ex);
shardingScalingJob.setStatus(SyncTaskControlStatus.PREPARING_FAILURE.name());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index 3232338..883af8f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -123,7 +123,7 @@ public final class InventoryDataTaskSplitter {
long min = rs.getLong(1);
long max = rs.getLong(2);
long step = (max - min) / concurrency;
- for (int i = 0; i < concurrency; i++) {
+ for (int i = 0; i < concurrency && min <= max; i++) {
RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration);
if (i < concurrency - 1) {
splitDumperConfig.setWhereCondition(String.format("WHERE %s BETWEEN %d AND %d", primaryKey, min, min + step));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 54c721f..5355fc8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -17,11 +17,14 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
@@ -35,8 +38,11 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import javax.sql.DataSource;
import static org.mockito.Mockito.verify;
@@ -49,9 +55,9 @@ public final class AbstractJDBCImporterTest {
private static final String INSERT_SQL = "INSERT INTO test_table (id,user,status) VALUES(?,?,?)";
- private static final String DELETE_SQL = "DELETE FROM test_table WHERE id = ?";
+ private static final String DELETE_SQL = "DELETE FROM test_table WHERE id = ? and user = ?";
- private static final String UPDATE_SQL = "UPDATE test_table SET user = ?,status = ? WHERE id = ?";
+ private static final String UPDATE_SQL = "UPDATE test_table SET user = ?,status = ? WHERE id = ? and user = ?";
@Mock
private DataSourceManager dataSourceManager;
@@ -107,27 +113,33 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord = getDataRecord("DELETE");
- when(sqlBuilder.buildDeleteSQL(deleteRecord)).thenReturn(DELETE_SQL);
+ when(sqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(deleteRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
+ verify(preparedStatement).setObject(2, 10);
verify(preparedStatement).execute();
}
@Test
public void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord = getDataRecord("UPDATE");
- when(sqlBuilder.buildUpdateSQL(updateRecord)).thenReturn(UPDATE_SQL);
+ when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 10);
verify(preparedStatement).setObject(2, "UPDATE");
verify(preparedStatement).setObject(3, 1);
+ verify(preparedStatement).setObject(4, 10);
verify(preparedStatement).execute();
}
+ private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+ return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("user"));
+ }
+
private List<Record> mockRecords(final DataRecord dataRecord) {
List<Record> result = new LinkedList<>();
result.add(dataRecord);
@@ -149,6 +161,9 @@ public final class AbstractJDBCImporterTest {
RdbmsConfiguration result = new RdbmsConfiguration();
result.setTableName(TABLE_NAME);
result.setDataSourceConfiguration(dataSourceConfiguration);
+ Map<String, Set<String>> shardingColumnsMap = Maps.newHashMap();
+ shardingColumnsMap.put("test_table", Sets.newHashSet("user"));
+ result.setShardingColumnsMap(shardingColumnsMap);
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index 875df99..22f9ea8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -17,13 +17,17 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+import com.google.common.collect.Sets;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collection;
+
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -51,25 +55,44 @@ public class AbstractSqlBuilderTest {
@Test
public void assertBuildInsertSql() {
String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
- assertThat(actual, is("INSERT INTO `t1`(`id`,`c1`,`c2`,`c3`) VALUES(?,?,?,?)"));
+ assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
}
@Test
- public void assertBuildUpdateSql() {
- String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"));
+ public void assertBuildUpdateSqlWithPrimaryKey() {
+ String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
}
@Test
- public void assertBuildDeleteSql() {
- String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"));
+ public void assertBuildUpdateSqlWithShardingColumns() {
+ DataRecord dataRecord = mockDataRecord("t2");
+ String actual = sqlBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord));
+ assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
+ }
+
+ @Test
+ public void assertBuildDeleteSqlWithPrimaryKey() {
+ String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"), RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ?"));
}
+ @Test
+ public void assertBuildDeleteSqlWithConditionColumns() {
+ DataRecord dataRecord = mockDataRecord("t3");
+ String actual = sqlBuilder.buildDeleteSQL(dataRecord, mockConditionColumns(dataRecord));
+ assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
+ }
+
+ private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+ return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("sc"));
+ }
+
private DataRecord mockDataRecord(final String tableName) {
DataRecord result = new DataRecord(new NopLogPosition(), 4);
result.setTableName(tableName);
result.addColumn(new Column("id", "", false, true));
+ result.addColumn(new Column("sc", "", false, false));
result.addColumn(new Column("c1", "", true, false));
result.addColumn(new Column("c2", "", true, false));
result.addColumn(new Column("c3", "", true, false));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
index 8bac86f..684937d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
@@ -37,7 +37,7 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
private static final String SHOW_GRANTS_SQL = "SHOW GRANTS";
- private static final String[][] REQUIRED_PRIVILEGES = {{"ALL PRIVILEGES", "ON *.*"}, {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT", "ON *.*"}};
+ private static final String[][] REQUIRED_PRIVILEGES = {{"ALL PRIVILEGES", "ON *.*"}, {"REPLICATION SLAVE", "REPLICATION CLIENT", "ON *.*"}};
private static final String SHOW_VARIABLES_SQL = "SHOW VARIABLES LIKE '%s'";
@@ -68,7 +68,7 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
} catch (SQLException e) {
throw new PrepareFailedException("Source datasource check privileges failed.");
}
- throw new PrepareFailedException("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges.");
+ throw new PrepareFailedException("Source datasource is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges.");
}
private boolean matchPrivileges(final String privilege) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
index cb2835a..f4f58f7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
@@ -70,7 +70,7 @@ public class MySQLDataSourceCheckerTest {
@Test
public void assertCheckPrivilegeWithParticularSuccess() throws SQLException {
when(resultSet.next()).thenReturn(true);
- when(resultSet.getString(1)).thenReturn("GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '%'@'%'");
+ when(resultSet.getString(1)).thenReturn("GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '%'@'%'");
dataSourceChecker.checkPrivilege(dataSources);
verify(preparedStatement, Mockito.times(1)).executeQuery();
}
@@ -89,7 +89,7 @@ public class MySQLDataSourceCheckerTest {
try {
dataSourceChecker.checkPrivilege(dataSources);
} catch (PrepareFailedException checkFailedEx) {
- assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges."));
+ assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges."));
}
}