You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/09 10:45:18 UTC

[shardingsphere] branch master updated: #6872, concurrent insert (#7356)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4df81a7  #6872, concurrent insert (#7356)
4df81a7 is described below

commit 4df81a714c966f9362226008ef29b8c0f59da830
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Wed Sep 9 18:44:58 2020 +0800

    #6872, concurrent insert (#7356)
---
 .../env/dataset/DataSetEnvironmentManager.java     | 91 +++++++++++++---------
 1 file changed, 56 insertions(+), 35 deletions(-)

diff --git a/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java b/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
index 1ba6408..01510f4 100644
--- a/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
+++ b/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
@@ -66,6 +66,21 @@ public final class DataSetEnvironmentManager {
         this.dataSourceMap = dataSourceMap;
     }
     
+    private static String generateTableName(final String tableName, final DatabaseType databaseType) {
+        switch (databaseType.getName()) {
+            case "H2":
+            case "PostgreSQL":
+            case "Oracle":
+                return "\"" + tableName + "\"";
+            case "MySQL":
+                return "`" + tableName + "`";
+            case "SQLServer":
+                return "[" + tableName + "]";
+            default:
+                throw new UnsupportedOperationException(String.format("Cannot support database [%s].", databaseType));
+        }
+    }
+    
     /**
      * Initialize data.
      * 
@@ -74,6 +89,7 @@ public final class DataSetEnvironmentManager {
      */
     public void initialize() throws SQLException, ParseException {
         Map<DataNode, List<DataSetRow>> dataNodeListMap = getDataSetRowMap();
+        List<Callable<Void>> insertTasks = new LinkedList<>();
         for (Entry<DataNode, List<DataSetRow>> entry : dataNodeListMap.entrySet()) {
             DataNode dataNode = entry.getKey();
             List<DataSetRow> dataSetRows = entry.getValue();
@@ -82,11 +98,17 @@ public final class DataSetEnvironmentManager {
             for (DataSetRow row : dataSetRows) {
                 sqlValueGroups.add(new SQLValueGroup(dataSetMetadata, row.getValues()));
             }
+            String insertSQL;
             try (Connection connection = dataSourceMap.get(dataNode.getDataSourceName()).getConnection()) {
-                String insertSQL = generateInsertSQL(generateTableName(dataNode.getTableName(), 
-                        DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL())), dataSetMetadata.getColumns());
-                executeBatch(connection, insertSQL, sqlValueGroups);
+                insertSQL = generateInsertSQL(generateTableName(dataNode.getTableName(), DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL())), dataSetMetadata.getColumns());
             }
+            insertTasks.add(new InsertTask(dataSourceMap.get(dataNode.getDataSourceName()), insertSQL, sqlValueGroups));
+        }
+        try {
+            SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(insertTasks);
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
         }
     }
     
@@ -112,33 +134,17 @@ public final class DataSetEnvironmentManager {
         return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, Joiner.on(",").join(columnNames), Joiner.on(",").join(placeholders));
     }
     
-    private void executeBatch(final Connection connection, final String sql, final List<SQLValueGroup> sqlValueGroups) throws SQLException {
-        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            for (SQLValueGroup each : sqlValueGroups) {
-                setParameters(preparedStatement, each);
-                preparedStatement.addBatch();
-            }
-            preparedStatement.executeBatch();
-        }
-    }
-    
-    private void setParameters(final PreparedStatement preparedStatement, final SQLValueGroup sqlValueGroup) throws SQLException {
-        for (SQLValue each : sqlValueGroup.getSqlValues()) {
-            preparedStatement.setObject(each.getIndex(), each.getValue());
-        }
-    }
-    
     /**
      * Clear data.
      * 
      */
     public void clear() {
-        List<Callable<Void>> clearTasks = new LinkedList<>();
+        List<Callable<Void>> deleteTasks = new LinkedList<>();
         for (Entry<String, Collection<String>> entry : getDataNodeMap().entrySet()) {
-            clearTasks.add(new ClearTask(dataSourceMap.get(entry.getKey()), entry.getValue()));
+            deleteTasks.add(new DeleteTask(dataSourceMap.get(entry.getKey()), entry.getValue()));
         }
         try {
-            SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(clearTasks);
+            SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(deleteTasks);
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -170,23 +176,38 @@ public final class DataSetEnvironmentManager {
         return result;
     }
     
-    private static String generateTableName(final String tableName, final DatabaseType databaseType) {
-        switch (databaseType.getName()) {
-            case "H2":
-            case "PostgreSQL":
-            case "Oracle":
-                return "\"" + tableName + "\"";
-            case "MySQL":
-                return "`" + tableName + "`";
-            case "SQLServer":
-                return "[" + tableName + "]";
-            default:
-                throw new UnsupportedOperationException(String.format("Cannot support database [%s].", databaseType));
+    @RequiredArgsConstructor
+    private static class InsertTask implements Callable<Void> {
+        
+        private final DataSource dataSource;
+        
+        private final String insertSQL;
+        
+        private final Collection<SQLValueGroup> sqlValueGroups;
+        
+        @Override
+        public Void call() throws SQLException {
+            try (Connection connection = dataSource.getConnection()) {
+                try (PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
+                    for (SQLValueGroup each : sqlValueGroups) {
+                        setParameters(preparedStatement, each);
+                        preparedStatement.addBatch();
+                    }
+                    preparedStatement.executeBatch();
+                }
+            }
+            return null;
+        }
+        
+        private void setParameters(final PreparedStatement preparedStatement, final SQLValueGroup sqlValueGroup) throws SQLException {
+            for (SQLValue each : sqlValueGroup.getSqlValues()) {
+                preparedStatement.setObject(each.getIndex(), each.getValue());
+            }
         }
     }
     
     @RequiredArgsConstructor
-    private static class ClearTask implements Callable<Void> {
+    private static class DeleteTask implements Callable<Void> {
         
         private final DataSource dataSource;