You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/09 10:45:18 UTC
[shardingsphere] branch master updated: #6872,
concurrent insert (#7356)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4df81a7 #6872, concurrent insert (#7356)
4df81a7 is described below
commit 4df81a714c966f9362226008ef29b8c0f59da830
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Wed Sep 9 18:44:58 2020 +0800
#6872, concurrent insert (#7356)
---
.../env/dataset/DataSetEnvironmentManager.java | 91 +++++++++++++---------
1 file changed, 56 insertions(+), 35 deletions(-)
diff --git a/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java b/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
index 1ba6408..01510f4 100644
--- a/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
+++ b/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
@@ -66,6 +66,21 @@ public final class DataSetEnvironmentManager {
this.dataSourceMap = dataSourceMap;
}
+ private static String generateTableName(final String tableName, final DatabaseType databaseType) {
+ switch (databaseType.getName()) {
+ case "H2":
+ case "PostgreSQL":
+ case "Oracle":
+ return "\"" + tableName + "\"";
+ case "MySQL":
+ return "`" + tableName + "`";
+ case "SQLServer":
+ return "[" + tableName + "]";
+ default:
+ throw new UnsupportedOperationException(String.format("Cannot support database [%s].", databaseType));
+ }
+ }
+
/**
* Initialize data.
*
@@ -74,6 +89,7 @@ public final class DataSetEnvironmentManager {
*/
public void initialize() throws SQLException, ParseException {
Map<DataNode, List<DataSetRow>> dataNodeListMap = getDataSetRowMap();
+ List<Callable<Void>> insertTasks = new LinkedList<>();
for (Entry<DataNode, List<DataSetRow>> entry : dataNodeListMap.entrySet()) {
DataNode dataNode = entry.getKey();
List<DataSetRow> dataSetRows = entry.getValue();
@@ -82,11 +98,17 @@ public final class DataSetEnvironmentManager {
for (DataSetRow row : dataSetRows) {
sqlValueGroups.add(new SQLValueGroup(dataSetMetadata, row.getValues()));
}
+ String insertSQL;
try (Connection connection = dataSourceMap.get(dataNode.getDataSourceName()).getConnection()) {
- String insertSQL = generateInsertSQL(generateTableName(dataNode.getTableName(),
- DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL())), dataSetMetadata.getColumns());
- executeBatch(connection, insertSQL, sqlValueGroups);
+ insertSQL = generateInsertSQL(generateTableName(dataNode.getTableName(), DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL())), dataSetMetadata.getColumns());
}
+ insertTasks.add(new InsertTask(dataSourceMap.get(dataNode.getDataSourceName()), insertSQL, sqlValueGroups));
+ }
+ try {
+ SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(insertTasks);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
}
}
@@ -112,33 +134,17 @@ public final class DataSetEnvironmentManager {
return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, Joiner.on(",").join(columnNames), Joiner.on(",").join(placeholders));
}
- private void executeBatch(final Connection connection, final String sql, final List<SQLValueGroup> sqlValueGroups) throws SQLException {
- try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- for (SQLValueGroup each : sqlValueGroups) {
- setParameters(preparedStatement, each);
- preparedStatement.addBatch();
- }
- preparedStatement.executeBatch();
- }
- }
-
- private void setParameters(final PreparedStatement preparedStatement, final SQLValueGroup sqlValueGroup) throws SQLException {
- for (SQLValue each : sqlValueGroup.getSqlValues()) {
- preparedStatement.setObject(each.getIndex(), each.getValue());
- }
- }
-
/**
* Clear data.
*
*/
public void clear() {
- List<Callable<Void>> clearTasks = new LinkedList<>();
+ List<Callable<Void>> deleteTasks = new LinkedList<>();
for (Entry<String, Collection<String>> entry : getDataNodeMap().entrySet()) {
- clearTasks.add(new ClearTask(dataSourceMap.get(entry.getKey()), entry.getValue()));
+ deleteTasks.add(new DeleteTask(dataSourceMap.get(entry.getKey()), entry.getValue()));
}
try {
- SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(clearTasks);
+ SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(deleteTasks);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -170,23 +176,38 @@ public final class DataSetEnvironmentManager {
return result;
}
- private static String generateTableName(final String tableName, final DatabaseType databaseType) {
- switch (databaseType.getName()) {
- case "H2":
- case "PostgreSQL":
- case "Oracle":
- return "\"" + tableName + "\"";
- case "MySQL":
- return "`" + tableName + "`";
- case "SQLServer":
- return "[" + tableName + "]";
- default:
- throw new UnsupportedOperationException(String.format("Cannot support database [%s].", databaseType));
+ @RequiredArgsConstructor
+ private static class InsertTask implements Callable<Void> {
+
+ private final DataSource dataSource;
+
+ private final String insertSQL;
+
+ private final Collection<SQLValueGroup> sqlValueGroups;
+
+ @Override
+ public Void call() throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
+ for (SQLValueGroup each : sqlValueGroups) {
+ setParameters(preparedStatement, each);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ }
+ return null;
+ }
+
+ private void setParameters(final PreparedStatement preparedStatement, final SQLValueGroup sqlValueGroup) throws SQLException {
+ for (SQLValue each : sqlValueGroup.getSqlValues()) {
+ preparedStatement.setObject(each.getIndex(), each.getValue());
+ }
}
}
@RequiredArgsConstructor
- private static class ClearTask implements Callable<Void> {
+ private static class DeleteTask implements Callable<Void> {
private final DataSource dataSource;