You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by xi...@apache.org on 2020/11/21 17:32:17 UTC

[shardingsphere] branch master updated: Move ExecutorJDBCManager impl to ShardingSphereConnection (#8277)

This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a07d5c4  Move ExecutorJDBCManager impl to ShardingSphereConnection (#8277)
a07d5c4 is described below

commit a07d5c4e96f4ee1eea91a6a58b4adf3541ac0750
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Nov 22 01:31:46 2020 +0800

    Move ExecutorJDBCManager impl to ShardingSphereConnection (#8277)
    
    * Rename ExecutorJDBCManager
    
    * Refactor ShardingSphereDatabaseMetaData
    
    * Move ExecutorJDBCManager impl to ShardingSphereConnection
---
 .../PreparedStatementExecutionGroupEngine.java     |   8 +-
 .../driver/jdbc/StatementExecutionGroupEngine.java |   8 +-
 .../PreparedStatementExecutionGroupEngineTest.java |   6 +-
 .../jdbc/StatementExecutionGroupEngineTest.java    |   6 +-
 .../jdbc/adapter/AbstractConnectionAdapter.java    | 141 +--------------------
 .../core/connection/ShardingSphereConnection.java  | 128 +++++++++++++++++--
 .../metadata/ShardingSphereDatabaseMetaData.java   |  10 +-
 7 files changed, 140 insertions(+), 167 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java
index 751a741..6c82df0 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java
@@ -35,8 +35,8 @@ import java.util.List;
 public final class PreparedStatementExecutionGroupEngine extends DriverExecutionGroupEngine<JDBCExecutionUnit, ExecutorJDBCManager, Connection, StatementOption> {
     
     public PreparedStatementExecutionGroupEngine(final int maxConnectionsSizePerQuery,
-                                                 final ExecutorJDBCManager executionConnection, final StatementOption option, final Collection<ShardingSphereRule> rules) {
-        super(maxConnectionsSizePerQuery, executionConnection, option, rules);
+                                                 final ExecutorJDBCManager executorJDBCManager, final StatementOption option, final Collection<ShardingSphereRule> rules) {
+        super(maxConnectionsSizePerQuery, executorJDBCManager, option, rules);
     }
     
     @Override
@@ -47,8 +47,8 @@ public final class PreparedStatementExecutionGroupEngine extends DriverExecution
         return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);
     }
     
-    private PreparedStatement createPreparedStatement(final String sql, final List<Object> parameters, final ExecutorJDBCManager executionConnection, final Connection connection,
+    private PreparedStatement createPreparedStatement(final String sql, final List<Object> parameters, final ExecutorJDBCManager executorJDBCManager, final Connection connection,
                                                       final ConnectionMode connectionMode, final StatementOption statementOption) throws SQLException {
-        return (PreparedStatement) executionConnection.createStorageResource(sql, parameters, connection, connectionMode, statementOption);
+        return (PreparedStatement) executorJDBCManager.createStorageResource(sql, parameters, connection, connectionMode, statementOption);
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java
index f2a20a7..02353c6 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java
@@ -34,8 +34,8 @@ import java.util.Collection;
 public final class StatementExecutionGroupEngine extends DriverExecutionGroupEngine<JDBCExecutionUnit, ExecutorJDBCManager, Connection, StatementOption> {
     
     public StatementExecutionGroupEngine(final int maxConnectionsSizePerQuery,
-                                         final ExecutorJDBCManager executionConnection, final StatementOption option, final Collection<ShardingSphereRule> rules) {
-        super(maxConnectionsSizePerQuery, executionConnection, option, rules);
+                                         final ExecutorJDBCManager executorJDBCManager, final StatementOption option, final Collection<ShardingSphereRule> rules) {
+        super(maxConnectionsSizePerQuery, executorJDBCManager, option, rules);
     }
     
     @Override
@@ -44,8 +44,8 @@ public final class StatementExecutionGroupEngine extends DriverExecutionGroupEng
         return new JDBCExecutionUnit(executionUnit, connectionMode, createStatement(executorManager, connection, connectionMode, option));
     }
     
-    private Statement createStatement(final ExecutorJDBCManager executionConnection, final Connection connection,
+    private Statement createStatement(final ExecutorJDBCManager executorJDBCManager, final Connection connection,
                                       final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
-        return executionConnection.createStorageResource(connection, connectionMode, option);
+        return executorJDBCManager.createStorageResource(connection, connectionMode, option);
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java
index 1219479..909bd66 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java
@@ -50,7 +50,7 @@ public final class PreparedStatementExecutionGroupEngineTest {
     @Test
     public void assertGetExecutionUnitGroupForOneShardMemoryStrictly() throws SQLException {
         groupEngine = new PreparedStatementExecutionGroupEngine(
-                2, mockExecutionConnection(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+                2, mockExecutorJDBCManager(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
         Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(1, 1));
         assertThat(actual.size(), is(1));
         for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -61,7 +61,7 @@ public final class PreparedStatementExecutionGroupEngineTest {
     @Test
     public void assertGetExecutionUnitGroupForMultiShardConnectionStrictly() throws SQLException {
         groupEngine = new PreparedStatementExecutionGroupEngine(
-                1, mockExecutionConnection(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+                1, mockExecutorJDBCManager(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
         Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(10, 2));
         assertThat(actual.size(), is(10));
         for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -69,7 +69,7 @@ public final class PreparedStatementExecutionGroupEngineTest {
         }
     }
     
-    private ExecutorJDBCManager mockExecutionConnection(final int size, final ConnectionMode connectionMode) throws SQLException {
+    private ExecutorJDBCManager mockExecutorJDBCManager(final int size, final ConnectionMode connectionMode) throws SQLException {
         List<Connection> connections = new ArrayList<>(size);
         for (int i = 0; i < size; i++) {
             connections.add(mock(Connection.class));
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java
index 68df290..4b65df6 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java
@@ -50,7 +50,7 @@ public final class StatementExecutionGroupEngineTest {
     @Test
     public void assertGetExecutionUnitGroupForOneShardMemoryStrictly() throws SQLException {
         groupEngine = new StatementExecutionGroupEngine(
-                2, mockExecutionConnection(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+                2, mockExecutorJDBCManager(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
         Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(1, 1));
         assertThat(actual.size(), is(1));
         for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -61,7 +61,7 @@ public final class StatementExecutionGroupEngineTest {
     @Test
     public void assertGetExecutionUnitGroupForMultiShardConnectionStrictly() throws SQLException {
         groupEngine = new StatementExecutionGroupEngine(
-                1, mockExecutionConnection(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+                1, mockExecutorJDBCManager(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
         Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(10, 2));
         assertThat(actual.size(), is(10));
         for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -69,7 +69,7 @@ public final class StatementExecutionGroupEngineTest {
         }
     }
     
-    private ExecutorJDBCManager mockExecutionConnection(final int size, final ConnectionMode connectionMode) throws SQLException {
+    private ExecutorJDBCManager mockExecutorJDBCManager(final int size, final ConnectionMode connectionMode) throws SQLException {
         List<Connection> connections = new ArrayList<>(size);
         for (int i = 0; i < size; i++) {
             connections.add(mock(Connection.class));
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java
index 50c1215..066ce3a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java
@@ -17,44 +17,25 @@
 
 package org.apache.shardingsphere.driver.jdbc.adapter;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Multimap;
 import lombok.Getter;
 import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
 import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationConnection;
-import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
-import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
-import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.hook.RootInvokeHook;
 import org.apache.shardingsphere.infra.hook.SPIRootInvokeHook;
 import org.apache.shardingsphere.replicaquery.route.engine.impl.PrimaryVisitedManager;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLWarning;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 
 /**
  * Adapter for {@code Connection}.
  */
-public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection implements ExecutorJDBCManager {
-    
-    @Getter
-    private final Map<String, DataSource> dataSourceMap;
-    
-    @Getter
-    private final MetaDataContexts metaDataContexts;
+public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
     
     @Getter
     private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
@@ -62,137 +43,21 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
     @Getter
     private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
     
+    @Getter
     private final ForceExecuteTemplate<Entry<String, Connection>> forceExecuteTemplateForClose = new ForceExecuteTemplate<>();
     
     private final RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
     
-    private boolean autoCommit = true;
-    
     private boolean readOnly;
     
     private volatile boolean closed;
     
     private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
     
-    protected AbstractConnectionAdapter(final Map<String, DataSource> dataSourceMap, final MetaDataContexts metaDataContexts) {
-        this.dataSourceMap = dataSourceMap;
-        this.metaDataContexts = metaDataContexts;
+    protected AbstractConnectionAdapter() {
         rootInvokeHook.start();
     }
     
-    /**
-     * Get database connection.
-     *
-     * @param dataSourceName data source name
-     * @return database connection
-     * @throws SQLException SQL exception
-     */
-    public final Connection getConnection(final String dataSourceName) throws SQLException {
-        return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
-    }
-    
-    @Override
-    public final List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
-        DataSource dataSource = dataSourceMap.get(dataSourceName);
-        Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
-        Collection<Connection> connections;
-        synchronized (cachedConnections) {
-            connections = cachedConnections.get(dataSourceName);
-        }
-        List<Connection> result;
-        if (connections.size() >= connectionSize) {
-            result = new ArrayList<>(connections).subList(0, connectionSize);
-        } else if (!connections.isEmpty()) {
-            result = new ArrayList<>(connectionSize);
-            result.addAll(connections);
-            List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);
-            result.addAll(newConnections);
-            synchronized (cachedConnections) {
-                cachedConnections.putAll(dataSourceName, newConnections);
-            }
-        } else {
-            result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));
-            synchronized (cachedConnections) {
-                cachedConnections.putAll(dataSourceName, result);
-            }
-        }
-        return result;
-    }
-    
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
-        if (1 == connectionSize) {
-            Connection connection = createConnection(dataSourceName, dataSource);
-            replayMethodsInvocation(connection);
-            return Collections.singletonList(connection);
-        }
-        if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
-            return createConnections(dataSourceName, dataSource, connectionSize);
-        }
-        synchronized (dataSource) {
-            return createConnections(dataSourceName, dataSource, connectionSize);
-        }
-    }
-    
-    private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
-        List<Connection> result = new ArrayList<>(connectionSize);
-        for (int i = 0; i < connectionSize; i++) {
-            try {
-                Connection connection = createConnection(dataSourceName, dataSource);
-                replayMethodsInvocation(connection);
-                result.add(connection);
-            } catch (final SQLException ex) {
-                for (Connection each : result) {
-                    each.close();
-                }
-                throw new SQLException(String.format("Can not get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
-            }
-        }
-        return result;
-    }
-    
-    protected abstract Connection createConnection(String dataSourceName, DataSource dataSource) throws SQLException;
-    
-    @SuppressWarnings("MagicConstant")
-    @Override
-    public final Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
-        return connection.createStatement(option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
-    }
-
-    @SuppressWarnings("MagicConstant")
-    @Override
-    public final PreparedStatement createStorageResource(final String sql, final List<Object> parameters, 
-                                                         final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
-        return option.isReturnGeneratedKeys() ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
-                : connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
-    }
-    
-    @Override
-    public final boolean getAutoCommit() {
-        return autoCommit;
-    }
-    
-    @Override
-    public void setAutoCommit(final boolean autoCommit) throws SQLException {
-        this.autoCommit = autoCommit;
-        setAutoCommitForLocalTransaction(autoCommit);
-    }
-    
-    private void setAutoCommitForLocalTransaction(final boolean autoCommit) throws SQLException {
-        recordMethodInvocation(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
-        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
-    }
-    
-    @Override
-    public void commit() throws SQLException {
-        forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
-    }
-    
-    @Override
-    public void rollback() throws SQLException {
-        forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
-    }
-    
     @Override
     public final void close() throws SQLException {
         closed = true;
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 5918192..54cc34f 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -17,12 +17,17 @@
 
 package org.apache.shardingsphere.driver.jdbc.core.connection;
 
+import com.google.common.base.Preconditions;
+import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
 import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
+import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.ExecutorJDBCManager;
+import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager;
@@ -33,36 +38,109 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
  * ShardingSphere Connection.
  */
 @Getter
-public final class ShardingSphereConnection extends AbstractConnectionAdapter {
+public final class ShardingSphereConnection extends AbstractConnectionAdapter implements ExecutorJDBCManager {
+    
+    private final Map<String, DataSource> dataSourceMap;
+    
+    private final MetaDataContexts metaDataContexts;
     
     private final TransactionType transactionType;
     
     private final ShardingTransactionManager shardingTransactionManager;
     
+    @Getter(AccessLevel.NONE)
+    private boolean autoCommit = true;
+    
     public ShardingSphereConnection(final Map<String, DataSource> dataSourceMap,
                                     final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts, final TransactionType transactionType) {
-        super(dataSourceMap, metaDataContexts);
+        this.dataSourceMap = dataSourceMap;
+        this.metaDataContexts = metaDataContexts;
         this.transactionType = transactionType;
         shardingTransactionManager = transactionContexts.getDefaultTransactionManagerEngine().getTransactionManager(transactionType);
     }
     
     /**
-     * Whether hold transaction or not.
+     * Get database connection.
      *
-     * @return true or false
+     * @param dataSourceName data source name
+     * @return database connection
+     * @throws SQLException SQL exception
      */
-    public boolean isHoldTransaction() {
-        return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
+    public Connection getConnection(final String dataSourceName) throws SQLException {
+        return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
     }
     
     @Override
-    protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
+    public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
+        DataSource dataSource = dataSourceMap.get(dataSourceName);
+        Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
+        Collection<Connection> connections;
+        synchronized (getCachedConnections()) {
+            connections = getCachedConnections().get(dataSourceName);
+        }
+        List<Connection> result;
+        if (connections.size() >= connectionSize) {
+            result = new ArrayList<>(connections).subList(0, connectionSize);
+        } else if (!connections.isEmpty()) {
+            result = new ArrayList<>(connectionSize);
+            result.addAll(connections);
+            List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);
+            result.addAll(newConnections);
+            synchronized (getCachedConnections()) {
+                getCachedConnections().putAll(dataSourceName, newConnections);
+            }
+        } else {
+            result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));
+            synchronized (getCachedConnections()) {
+                getCachedConnections().putAll(dataSourceName, result);
+            }
+        }
+        return result;
+    }
+    
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
+        if (1 == connectionSize) {
+            Connection connection = createConnection(dataSourceName, dataSource);
+            replayMethodsInvocation(connection);
+            return Collections.singletonList(connection);
+        }
+        if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
+            return createConnections(dataSourceName, dataSource, connectionSize);
+        }
+        synchronized (dataSource) {
+            return createConnections(dataSourceName, dataSource, connectionSize);
+        }
+    }
+    
+    private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
+        List<Connection> result = new ArrayList<>(connectionSize);
+        for (int i = 0; i < connectionSize; i++) {
+            try {
+                Connection connection = createConnection(dataSourceName, dataSource);
+                replayMethodsInvocation(connection);
+                result.add(connection);
+            } catch (final SQLException ex) {
+                for (Connection each : result) {
+                    each.close();
+                }
+                throw new SQLException(String.format("Can not get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
+            }
+        }
+        return result;
+    }
+    
+    private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
         return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
     }
     
@@ -70,6 +148,29 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
         return null != shardingTransactionManager && shardingTransactionManager.isInTransaction();
     }
     
+    /**
+     * Whether hold transaction or not.
+     *
+     * @return true or false
+     */
+    public boolean isHoldTransaction() {
+        return (TransactionType.LOCAL == transactionType && !autoCommit) || (TransactionType.XA == transactionType && isInShardingTransaction());
+    }
+    
+    @SuppressWarnings("MagicConstant")
+    @Override
+    public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
+        return connection.createStatement(option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
+    }
+
+    @SuppressWarnings("MagicConstant")
+    @Override
+    public PreparedStatement createStorageResource(final String sql, final List<Object> parameters,
+                                                         final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
+        return option.isReturnGeneratedKeys() ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
+                : connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
+    }
+    
     @Override
     public DatabaseMetaData getMetaData() {
         return new ShardingSphereDatabaseMetaData(this);
@@ -121,9 +222,16 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
     }
     
     @Override
+    public boolean getAutoCommit() {
+        return autoCommit;
+    }
+    
+    @Override
     public void setAutoCommit(final boolean autoCommit) throws SQLException {
         if (TransactionType.LOCAL == transactionType) {
-            super.setAutoCommit(autoCommit);
+            this.autoCommit = autoCommit;
+            recordMethodInvocation(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
+            getForceExecuteTemplate().execute(getCachedConnections().values(), connection -> connection.setAutoCommit(autoCommit));
             return;
         }
         if (autoCommit != shardingTransactionManager.isInTransaction()) {
@@ -147,7 +255,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
     @Override
     public void commit() throws SQLException {
         if (TransactionType.LOCAL == transactionType) {
-            super.commit();
+            getForceExecuteTemplate().execute(getCachedConnections().values(), Connection::commit);
         } else {
             shardingTransactionManager.commit();
         }
@@ -156,7 +264,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
     @Override
     public void rollback() throws SQLException {
         if (TransactionType.LOCAL == transactionType) {
-            super.rollback();
+            getForceExecuteTemplate().execute(getCachedConnections().values(), Connection::rollback);
         } else {
             shardingTransactionManager.rollback();
         }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
index 20bb6a1..cf16598 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
@@ -18,13 +18,13 @@
 package org.apache.shardingsphere.driver.jdbc.core.datasource.metadata;
 
 import lombok.Getter;
-import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
 import org.apache.shardingsphere.driver.jdbc.adapter.AdaptedDatabaseMetaData;
+import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.driver.jdbc.core.resultset.DatabaseMetaDataResultSet;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
-import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.metadata.resource.DataSourcesMetaData;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
 
 import java.security.SecureRandom;
 import java.sql.Connection;
@@ -42,7 +42,7 @@ import java.util.Random;
 @Getter
 public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaData {
     
-    private final AbstractConnectionAdapter connection;
+    private final ShardingSphereConnection connection;
     
     private final Collection<ShardingSphereRule> rules;
     
@@ -56,7 +56,7 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
     
     private DatabaseMetaData currentDatabaseMetaData;
     
-    public ShardingSphereDatabaseMetaData(final AbstractConnectionAdapter connection) {
+    public ShardingSphereDatabaseMetaData(final ShardingSphereConnection connection) {
         super(connection.getMetaDataContexts().getDefaultMetaData().getResource().getCachedDatabaseMetaData());
         this.connection = connection;
         rules = connection.getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules();