You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2021/10/16 14:34:30 UTC

[shardingsphere] branch master updated: Add ConnectionManager (#13072)

This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 87381e4  Add ConnectionManager (#13072)
87381e4 is described below

commit 87381e4e59108123694af8e4fb4e43ab8f0bdc92
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Oct 16 22:33:39 2021 +0800

    Add ConnectionManager (#13072)
    
    * Add ConnectionManager
    
    * Add test cases of ConnectionManager
    
    * Add more test cases of ConnectionManager
    
    * Add more test cases of ConnectionManager
    
    * Add super interface ExecutorJDBCManager with ConnectionManager
    
    * Add super interface ExecutorJDBCManager with ConnectionManager
    
    * Refactor ConnectionManager
    
    * Refactor ConnectionManager
---
 ...phereConnection.java => ConnectionManager.java} | 310 +++++++--------------
 .../core/connection/ShardingSphereConnection.java  | 213 ++------------
 .../metadata/ShardingSphereDatabaseMetaData.java   |   9 +-
 .../statement/ShardingSpherePreparedStatement.java |   4 +-
 .../core/statement/ShardingSphereStatement.java    |   2 +-
 .../core/connection/ConnectionManagerTest.java     | 123 ++++++++
 .../connection/ShardingSphereConnectionTest.java   | 100 ++-----
 .../datasource/ShardingSphereDataSourceTest.java   |  13 +-
 .../ShardingSphereDatabaseMetaDataTest.java        |   5 +-
 9 files changed, 289 insertions(+), 490 deletions(-)

diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
similarity index 55%
copy from shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
copy to shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
index 891a3cf..35ac80e 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
@@ -21,25 +21,18 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Multimap;
 import lombok.Getter;
-import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
 import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
-import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
-import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
-import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
+import org.apache.shardingsphere.driver.jdbc.adapter.invocation.MethodInvocationRecorder;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.readwritesplitting.route.impl.PrimaryVisitedManager;
 import org.apache.shardingsphere.transaction.ConnectionTransaction;
-import org.apache.shardingsphere.transaction.TransactionHolder;
 import org.apache.shardingsphere.transaction.rule.TransactionRule;
 
 import javax.sql.DataSource;
 import java.security.SecureRandom;
-import java.sql.Array;
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -47,39 +40,30 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 
 /**
- * ShardingSphere Connection.
+ * Connection manager.
  */
-public final class ShardingSphereConnection extends AbstractConnectionAdapter implements ExecutorJDBCManager {
+public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseable {
     
-    @Getter
-    private final String schema;
+    private final Map<String, DataSource> dataSourceMap;
     
     @Getter
-    private final ContextManager contextManager;
+    private final ConnectionTransaction connectionTransaction;
     
     private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
     
-    private final ConnectionTransaction connectionTransaction;
+    private final MethodInvocationRecorder methodInvocationRecorder = new MethodInvocationRecorder();
     
     private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
     
     private final Random random = new SecureRandom();
     
-    private boolean autoCommit = true;
-    
-    private boolean readOnly;
-    
-    private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
-    
-    private volatile boolean closed;
-    
-    public ShardingSphereConnection(final String schema, final ContextManager contextManager) {
-        this.schema = schema;
-        this.contextManager = contextManager;
+    public ConnectionManager(final String schema, final ContextManager contextManager) {
+        dataSourceMap = contextManager.getDataSourceMap(schema);
         connectionTransaction = createConnectionTransaction(schema, contextManager);
     }
     
@@ -90,29 +74,113 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     }
     
     /**
+     * Set auto commit.
+     * 
+     * @param autoCommit auto commit
+     * @throws SQLException SQL exception
+     */
+    public void setAutoCommit(final boolean autoCommit) throws SQLException {
+        methodInvocationRecorder.record(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
+        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
+    }
+    
+    /**
+     * Commit.
+     * 
+     * @throws SQLException SQL exception
+     */
+    public void commit() throws SQLException {
+        if (connectionTransaction.isLocalTransaction()) {
+            forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
+        } else {
+            connectionTransaction.commit();
+        }
+    }
+    
+    /**
+     * Rollback.
+     *
+     * @throws SQLException SQL exception
+     */
+    public void rollback() throws SQLException {
+        if (connectionTransaction.isLocalTransaction()) {
+            forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
+        } else {
+            connectionTransaction.rollback();
+        }
+    }
+    
+    /**
+     * Get transaction isolation.
+     * 
+     * @return transaction isolation level
+     * @throws SQLException SQL exception
+     */
+    public Optional<Integer> getTransactionIsolation() throws SQLException {
+        return cachedConnections.values().isEmpty() ? Optional.empty() : Optional.of(cachedConnections.values().iterator().next().getTransactionIsolation());
+    }
+    
+    /**
+     * Set transaction isolation.
+     *
+     * @param level transaction isolation level
+     * @throws SQLException SQL exception
+     */
+    public void setTransactionIsolation(final int level) throws SQLException {
+        methodInvocationRecorder.record(Connection.class, "setTransactionIsolation", new Class[]{int.class}, new Object[]{level});
+        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setTransactionIsolation(level));
+    }
+    
+    /**
+     * Set read only.
+     *
+     * @param readOnly read only
+     * @throws SQLException SQL exception
+     */
+    public void setReadOnly(final boolean readOnly) throws SQLException {
+        methodInvocationRecorder.record(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
+        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
+    }
+    
+    /**
+     * Whether connection valid.
+     * 
+     * @param timeout timeout
+     * @return connection valid or not
+     * @throws SQLException SQL exception
+     */
+    public boolean isValid(final int timeout) throws SQLException {
+        for (Connection each : cachedConnections.values()) {
+            if (!each.isValid(timeout)) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
+    /**
      * Get random physical data source name.
      *
      * @return random physical data source name
      */
     public String getRandomPhysicalDataSourceName() {
-        Collection<String> datasourceNames = cachedConnections.isEmpty() ? contextManager.getDataSourceMap(schema).keySet() : cachedConnections.keySet();
+        Collection<String> datasourceNames = cachedConnections.isEmpty() ? dataSourceMap.keySet() : cachedConnections.keySet();
         return new ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));
     }
     
     /**
-     * Get connection.
+     * Get random connection.
      *
-     * @param dataSourceName data source name
-     * @return connection
+     * @return random connection
      * @throws SQLException SQL exception
      */
-    public Connection getConnection(final String dataSourceName) throws SQLException {
-        return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
+    public Connection getRandomConnection() throws SQLException {
+        return getConnections(getRandomPhysicalDataSourceName(), 1, ConnectionMode.MEMORY_STRICTLY).get(0);
     }
     
     @Override
     public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
-        DataSource dataSource = contextManager.getDataSourceMap(schema).get(dataSourceName);
+        DataSource dataSource = dataSourceMap.get(dataSourceName);
         Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
         Collection<Connection> connections;
         synchronized (cachedConnections) {
@@ -142,7 +210,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
         if (1 == connectionSize) {
             Connection connection = createConnection(dataSourceName, dataSource);
-            getMethodInvocationRecorder().replay(connection);
+            methodInvocationRecorder.replay(connection);
             return Collections.singletonList(connection);
         }
         if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
@@ -158,7 +226,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
         for (int i = 0; i < connectionSize; i++) {
             try {
                 Connection connection = createConnection(dataSourceName, dataSource);
-                getMethodInvocationRecorder().replay(connection);
+                methodInvocationRecorder.replay(connection);
                 result.add(connection);
             } catch (final SQLException ex) {
                 for (Connection each : result) {
@@ -175,15 +243,6 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
         return connectionInTransaction.isPresent() ? connectionInTransaction.get() : dataSource.getConnection();
     }
     
-    /**
-     * Whether hold transaction or not.
-     *
-     * @return true or false
-     */
-    public boolean isHoldTransaction() {
-        return connectionTransaction.isHoldTransaction(autoCommit);
-    }
-    
     @SuppressWarnings("MagicConstant")
     @Override
     public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
@@ -199,172 +258,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     }
     
     @Override
-    public DatabaseMetaData getMetaData() {
-        return new ShardingSphereDatabaseMetaData(this);
-    }
-    
-    @Override
-    public PreparedStatement prepareStatement(final String sql) throws SQLException {
-        return new ShardingSpherePreparedStatement(this, sql);
-    }
-    
-    @Override
-    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
-        return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency);
-    }
-    
-    @Override
-    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
-        return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
-    }
-    
-    @Override
-    public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
-        return new ShardingSpherePreparedStatement(this, sql, autoGeneratedKeys);
-    }
-    
-    @Override
-    public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
-        return new ShardingSpherePreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);
-    }
-    
-    @Override
-    public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
-        return new ShardingSpherePreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);
-    }
-    
-    @Override
-    public Statement createStatement() {
-        return new ShardingSphereStatement(this);
-    }
-    
-    @Override
-    public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
-        return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency);
-    }
-    
-    @Override
-    public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
-        return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
-    }
-    
-    @Override
-    public boolean getAutoCommit() {
-        return autoCommit;
-    }
-    
-    @Override
-    public void setAutoCommit(final boolean autoCommit) throws SQLException {
-        this.autoCommit = autoCommit;
-        if (connectionTransaction.isLocalTransaction()) {
-            processLocalTransaction();
-        } else {
-            processDistributeTransaction();
-        }
-    }
-    
-    private void processLocalTransaction() throws SQLException {
-        getMethodInvocationRecorder().record(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
-        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
-        if (!autoCommit) {
-            TransactionHolder.setInTransaction();
-        }
-    }
-    
-    private void processDistributeTransaction() throws SQLException {
-        switch (connectionTransaction.getDistributedTransactionOperationType(autoCommit)) {
-            case BEGIN:
-                closeCachedConnections();
-                connectionTransaction.begin();
-                TransactionHolder.setInTransaction();
-                break;
-            case COMMIT:
-                connectionTransaction.commit();
-                break;
-            default:
-                break;
-        }
-    }
-    
-    private void closeCachedConnections() throws SQLException {
-        forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
-        cachedConnections.clear();
-    }
-    
-    @Override
-    public void commit() throws SQLException {
-        try {
-            if (connectionTransaction.isLocalTransaction()) {
-                forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
-            } else {
-                connectionTransaction.commit();
-            }
-        } finally {
-            TransactionHolder.clear();
-        }
-    }
-    
-    @Override
-    public void rollback() throws SQLException {
-        try {
-            if (connectionTransaction.isLocalTransaction()) {
-                forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
-            } else {
-                connectionTransaction.rollback();
-            }
-        } finally {
-            TransactionHolder.clear();
-        }
-    }
-    
-    @Override
-    public boolean isReadOnly() {
-        return readOnly;
-    }
-    
-    @Override
-    public void setReadOnly(final boolean readOnly) throws SQLException {
-        this.readOnly = readOnly;
-        getMethodInvocationRecorder().record(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
-        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
-    }
-    
-    @Override
-    public int getTransactionIsolation() throws SQLException {
-        return cachedConnections.values().isEmpty() ? transactionIsolation : cachedConnections.values().iterator().next().getTransactionIsolation();
-    }
-    
-    @Override
-    public void setTransactionIsolation(final int level) throws SQLException {
-        transactionIsolation = level;
-        getMethodInvocationRecorder().record(Connection.class, "setTransactionIsolation", new Class[]{int.class}, new Object[]{level});
-        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setTransactionIsolation(level));
-    }
-    
-    @Override
-    public boolean isValid(final int timeout) throws SQLException {
-        for (Connection connection : cachedConnections.values()) {
-            if (!connection.isValid(timeout)) {
-                return false;
-            }
-        }
-        return true;
-    }
-    
-    @Override
-    public Array createArrayOf(final String typeName, final Object[] elements) throws SQLException {
-        return getConnection(getRandomPhysicalDataSourceName()).createArrayOf(typeName, elements);
-    }
-    
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-    
-    @Override
     public void close() throws SQLException {
-        closed = true;
-        PrimaryVisitedManager.clear();
         try {
             forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
         } finally {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 891a3cf..d78f2eb 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -17,43 +17,25 @@
 
 package org.apache.shardingsphere.driver.jdbc.core.connection;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Multimap;
 import lombok.Getter;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
-import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.readwritesplitting.route.impl.PrimaryVisitedManager;
-import org.apache.shardingsphere.transaction.ConnectionTransaction;
 import org.apache.shardingsphere.transaction.TransactionHolder;
-import org.apache.shardingsphere.transaction.rule.TransactionRule;
 
-import javax.sql.DataSource;
-import java.security.SecureRandom;
 import java.sql.Array;
-import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
 
 /**
- * ShardingSphere Connection.
+ * ShardingSphere connection.
  */
-public final class ShardingSphereConnection extends AbstractConnectionAdapter implements ExecutorJDBCManager {
+public final class ShardingSphereConnection extends AbstractConnectionAdapter {
     
     @Getter
     private final String schema;
@@ -61,118 +43,21 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     @Getter
     private final ContextManager contextManager;
     
-    private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
-    
-    private final ConnectionTransaction connectionTransaction;
-    
-    private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
-    
-    private final Random random = new SecureRandom();
+    @Getter
+    private final ConnectionManager connectionManager;
     
     private boolean autoCommit = true;
     
-    private boolean readOnly;
-    
     private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
     
+    private boolean readOnly;
+    
     private volatile boolean closed;
     
     public ShardingSphereConnection(final String schema, final ContextManager contextManager) {
         this.schema = schema;
         this.contextManager = contextManager;
-        connectionTransaction = createConnectionTransaction(schema, contextManager);
-    }
-    
-    private ConnectionTransaction createConnectionTransaction(final String schemaName, final ContextManager contextManager) {
-        Optional<TransactionRule> transactionRule = contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class);
-        return transactionRule.map(optional -> new ConnectionTransaction(schemaName, optional, contextManager.getTransactionContexts()))
-                .orElseGet(() -> new ConnectionTransaction(schemaName, contextManager.getTransactionContexts()));
-    }
-    
-    /**
-     * Get random physical data source name.
-     *
-     * @return random physical data source name
-     */
-    public String getRandomPhysicalDataSourceName() {
-        Collection<String> datasourceNames = cachedConnections.isEmpty() ? contextManager.getDataSourceMap(schema).keySet() : cachedConnections.keySet();
-        return new ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));
-    }
-    
-    /**
-     * Get connection.
-     *
-     * @param dataSourceName data source name
-     * @return connection
-     * @throws SQLException SQL exception
-     */
-    public Connection getConnection(final String dataSourceName) throws SQLException {
-        return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
-    }
-    
-    @Override
-    public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
-        DataSource dataSource = contextManager.getDataSourceMap(schema).get(dataSourceName);
-        Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
-        Collection<Connection> connections;
-        synchronized (cachedConnections) {
-            connections = cachedConnections.get(dataSourceName);
-        }
-        List<Connection> result;
-        if (connections.size() >= connectionSize) {
-            result = new ArrayList<>(connections).subList(0, connectionSize);
-        } else if (!connections.isEmpty()) {
-            result = new ArrayList<>(connectionSize);
-            result.addAll(connections);
-            List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);
-            result.addAll(newConnections);
-            synchronized (cachedConnections) {
-                cachedConnections.putAll(dataSourceName, newConnections);
-            }
-        } else {
-            result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));
-            synchronized (cachedConnections) {
-                cachedConnections.putAll(dataSourceName, result);
-            }
-        }
-        return result;
-    }
-    
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
-        if (1 == connectionSize) {
-            Connection connection = createConnection(dataSourceName, dataSource);
-            getMethodInvocationRecorder().replay(connection);
-            return Collections.singletonList(connection);
-        }
-        if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
-            return createConnections(dataSourceName, dataSource, connectionSize);
-        }
-        synchronized (dataSource) {
-            return createConnections(dataSourceName, dataSource, connectionSize);
-        }
-    }
-    
-    private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
-        List<Connection> result = new ArrayList<>(connectionSize);
-        for (int i = 0; i < connectionSize; i++) {
-            try {
-                Connection connection = createConnection(dataSourceName, dataSource);
-                getMethodInvocationRecorder().replay(connection);
-                result.add(connection);
-            } catch (final SQLException ex) {
-                for (Connection each : result) {
-                    each.close();
-                }
-                throw new SQLException(String.format("Can not get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
-            }
-        }
-        return result;
-    }
-    
-    private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
-        Optional<Connection> connectionInTransaction = connectionTransaction.getConnection(dataSourceName);
-        return connectionInTransaction.isPresent() ? connectionInTransaction.get() : dataSource.getConnection();
+        connectionManager = new ConnectionManager(schema, contextManager);
     }
     
     /**
@@ -181,21 +66,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
      * @return true or false
      */
     public boolean isHoldTransaction() {
-        return connectionTransaction.isHoldTransaction(autoCommit);
-    }
-    
-    @SuppressWarnings("MagicConstant")
-    @Override
-    public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
-        return connection.createStatement(option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
-    }
-    
-    @SuppressWarnings("MagicConstant")
-    @Override
-    public PreparedStatement createStorageResource(final String sql, final List<Object> parameters,
-                                                   final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
-        return option.isReturnGeneratedKeys() ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
-                : connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
+        return connectionManager.getConnectionTransaction().isHoldTransaction(autoCommit);
     }
     
     @Override
@@ -256,7 +127,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     @Override
     public void setAutoCommit(final boolean autoCommit) throws SQLException {
         this.autoCommit = autoCommit;
-        if (connectionTransaction.isLocalTransaction()) {
+        if (connectionManager.getConnectionTransaction().isLocalTransaction()) {
             processLocalTransaction();
         } else {
             processDistributeTransaction();
@@ -264,41 +135,31 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     }
     
     private void processLocalTransaction() throws SQLException {
-        getMethodInvocationRecorder().record(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
-        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
+        connectionManager.setAutoCommit(autoCommit);
         if (!autoCommit) {
             TransactionHolder.setInTransaction();
         }
     }
     
     private void processDistributeTransaction() throws SQLException {
-        switch (connectionTransaction.getDistributedTransactionOperationType(autoCommit)) {
+        switch (connectionManager.getConnectionTransaction().getDistributedTransactionOperationType(autoCommit)) {
             case BEGIN:
-                closeCachedConnections();
-                connectionTransaction.begin();
+                connectionManager.close();
+                connectionManager.getConnectionTransaction().begin();
                 TransactionHolder.setInTransaction();
                 break;
             case COMMIT:
-                connectionTransaction.commit();
+                connectionManager.getConnectionTransaction().commit();
                 break;
             default:
                 break;
         }
     }
     
-    private void closeCachedConnections() throws SQLException {
-        forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
-        cachedConnections.clear();
-    }
-    
     @Override
     public void commit() throws SQLException {
         try {
-            if (connectionTransaction.isLocalTransaction()) {
-                forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
-            } else {
-                connectionTransaction.commit();
-            }
+            connectionManager.commit();
         } finally {
             TransactionHolder.clear();
         }
@@ -307,53 +168,43 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     @Override
     public void rollback() throws SQLException {
         try {
-            if (connectionTransaction.isLocalTransaction()) {
-                forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
-            } else {
-                connectionTransaction.rollback();
-            }
+            connectionManager.rollback();
         } finally {
             TransactionHolder.clear();
         }
     }
     
+    @SuppressWarnings("MagicConstant")
     @Override
-    public boolean isReadOnly() {
-        return readOnly;
+    public int getTransactionIsolation() throws SQLException {
+        return connectionManager.getTransactionIsolation().orElseGet(() -> transactionIsolation);
     }
     
     @Override
-    public void setReadOnly(final boolean readOnly) throws SQLException {
-        this.readOnly = readOnly;
-        getMethodInvocationRecorder().record(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
-        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
+    public void setTransactionIsolation(final int level) throws SQLException {
+        transactionIsolation = level;
+        connectionManager.setTransactionIsolation(level);
     }
     
     @Override
-    public int getTransactionIsolation() throws SQLException {
-        return cachedConnections.values().isEmpty() ? transactionIsolation : cachedConnections.values().iterator().next().getTransactionIsolation();
+    public boolean isReadOnly() {
+        return readOnly;
     }
     
     @Override
-    public void setTransactionIsolation(final int level) throws SQLException {
-        transactionIsolation = level;
-        getMethodInvocationRecorder().record(Connection.class, "setTransactionIsolation", new Class[]{int.class}, new Object[]{level});
-        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setTransactionIsolation(level));
+    public void setReadOnly(final boolean readOnly) throws SQLException {
+        this.readOnly = readOnly;
+        connectionManager.setReadOnly(readOnly);
     }
     
     @Override
     public boolean isValid(final int timeout) throws SQLException {
-        for (Connection connection : cachedConnections.values()) {
-            if (!connection.isValid(timeout)) {
-                return false;
-            }
-        }
-        return true;
+        return connectionManager.isValid(timeout);
     }
     
     @Override
     public Array createArrayOf(final String typeName, final Object[] elements) throws SQLException {
-        return getConnection(getRandomPhysicalDataSourceName()).createArrayOf(typeName, elements);
+        return connectionManager.getRandomConnection().createArrayOf(typeName, elements);
     }
     
     @Override
@@ -365,10 +216,6 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
     public void close() throws SQLException {
         closed = true;
         PrimaryVisitedManager.clear();
-        try {
-            forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
-        } finally {
-            cachedConnections.clear();
-        }
+        connectionManager.close();
     }
 }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
index 2d4c39c..4c76414 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
@@ -46,6 +46,8 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
     
     private String currentPhysicalDataSourceName;
     
+    private Connection currentPhysicalConnection;
+    
     private DatabaseMetaData currentDatabaseMetaData;
     
     public ShardingSphereDatabaseMetaData(final ShardingSphereConnection connection) {
@@ -57,7 +59,10 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
     
     @Override
     public Connection getConnection() throws SQLException {
-        return connection.getConnection(getDataSourceName());
+        if (null == currentPhysicalConnection) {
+            currentPhysicalConnection = connection.getConnectionManager().getRandomConnection();
+        } 
+        return currentPhysicalConnection;
     }
     
     @Override
@@ -231,7 +236,7 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
     
     private String getDataSourceName() {
         if (null == currentPhysicalDataSourceName) {
-            currentPhysicalDataSourceName = connection.getRandomPhysicalDataSourceName();
+            currentPhysicalDataSourceName = connection.getConnectionManager().getRandomPhysicalDataSourceName();
         }
         return currentPhysicalDataSourceName;
     }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 4f7fbb2..0ee626a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -225,7 +225,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
         int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection, 
+        return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(), 
                 statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
     }
     
@@ -466,7 +466,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
     private void initBatchPreparedStatementExecutor() throws SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
                 JDBCDriverType.PREPARED_STATEMENT, metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
-                connection, statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
+                connection.getConnectionManager(), statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
         batchPreparedStatementExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
                 new ArrayList<>(batchPreparedStatementExecutor.getBatchExecutionUnits()).stream().map(BatchExecutionUnit::getExecutionUnit).collect(Collectors.toList())));
         setBatchParametersForStatements();
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index ee5b485..7f681d2 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -174,7 +174,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
         int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection, 
+        return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(), 
                 statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
     }
     
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
new file mode 100644
index 0000000..421f71a
--- /dev/null
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.jdbc.core.connection;
+
+import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class ConnectionManagerTest {
+    
+    private ConnectionManager connectionManager;
+    
+    @Before
+    public void setUp() throws SQLException {
+        connectionManager = new ConnectionManager(DefaultSchema.LOGIC_NAME, mockContextManager());
+    }
+    
+    private ContextManager mockContextManager() throws SQLException {
+        ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
+        Map<String, DataSource> dataSourceMap = mockDataSourceMap();
+        when(result.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+        when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+        return result;
+    }
+    
+    private Map<String, DataSource> mockDataSourceMap() throws SQLException {
+        Map<String, DataSource> result = new HashMap<>(2, 1);
+        result.put("ds", mock(DataSource.class, RETURNS_DEEP_STUBS));
+        DataSource invalidDataSource = mock(DataSource.class);
+        when(invalidDataSource.getConnection()).thenThrow(new SQLException());
+        result.put("invalid_ds", invalidDataSource);
+        return result;
+    }
+    
+    @Test
+    public void assertGetRandomPhysicalDataSourceNameFromContextManager() throws SQLException {
+        connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+        String actual = connectionManager.getRandomPhysicalDataSourceName();
+        assertThat(actual, is("ds"));
+    }
+    
+    @Test
+    public void assertGetRandomPhysicalDataSourceNameFromCache() throws SQLException {
+        connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+        String actual = connectionManager.getRandomPhysicalDataSourceName();
+        assertThat(actual, is("ds"));
+    }
+    
+    @Test
+    public void assertGetConnection() throws SQLException {
+        assertThat(connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY),
+                is(connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY)));
+    }
+    
+    @Test
+    public void assertGetConnectionsWhenAllInCache() throws SQLException {
+        Connection expected = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
+        List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.CONNECTION_STRICTLY);
+        assertThat(actual.size(), is(1));
+        assertThat(actual.get(0), is(expected));
+    }
+    
+    @Test
+    public void assertGetConnectionsWhenEmptyCache() throws SQLException {
+        List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+        assertThat(actual.size(), is(1));
+    }
+    
+    @Test
+    public void assertGetConnectionsWhenPartInCacheWithMemoryStrictlyMode() throws SQLException {
+        connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+        List<Connection> actual = connectionManager.getConnections("ds", 3, ConnectionMode.MEMORY_STRICTLY);
+        assertThat(actual.size(), is(3));
+    }
+    
+    @Test
+    public void assertGetConnectionsWhenPartInCacheWithConnectionStrictlyMode() throws SQLException {
+        connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+        List<Connection> actual = connectionManager.getConnections("ds", 3, ConnectionMode.CONNECTION_STRICTLY);
+        assertThat(actual.size(), is(3));
+    }
+    
+    @Test
+    public void assertGetConnectionsWhenConnectionCreateFailed() {
+        try {
+            connectionManager.getConnections("invalid_ds", 3, ConnectionMode.CONNECTION_STRICTLY);
+        } catch (final SQLException ex) {
+            assertThat(ex.getMessage(), is("Can not get 3 connections one time, partition succeed connection(0) have released!"));
+        }
+    }
+}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
index 3087efd..897fd74 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.driver.jdbc.core.connection;
 
-import com.google.common.collect.Multimap;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -36,7 +35,6 @@ import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -75,62 +73,6 @@ public final class ShardingSphereConnectionTest {
     }
     
     @Test
-    public void assertGetRandomPhysicalDataSourceNameFromContextManager() {
-        String actual = connection.getRandomPhysicalDataSourceName();
-        assertThat(actual, is("ds"));
-    }
-    
-    @Test
-    public void assertGetRandomPhysicalDataSourceNameFromCache() throws SQLException {
-        connection.getConnection("ds");
-        String actual = connection.getRandomPhysicalDataSourceName();
-        assertThat(actual, is("ds"));
-    }
-    
-    @Test
-    public void assertGetConnection() throws SQLException {
-        assertThat(connection.getConnection("ds"), is(connection.getConnection("ds")));
-    }
-    
-    @Test
-    public void assertGetConnectionsWhenAllInCache() throws SQLException {
-        Connection expected = connection.getConnection("ds");
-        List<Connection> actual = connection.getConnections("ds", 1, ConnectionMode.CONNECTION_STRICTLY);
-        assertThat(actual.size(), is(1));
-        assertThat(actual.get(0), is(expected));
-    }
-    
-    @Test
-    public void assertGetConnectionsWhenEmptyCache() throws SQLException {
-        List<Connection> actual = connection.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
-        assertThat(actual.size(), is(1));
-    }
-    
-    @Test
-    public void assertGetConnectionsWhenPartInCacheWithMemoryStrictlyMode() throws SQLException {
-        connection.getConnection("ds");
-        List<Connection> actual = connection.getConnections("ds", 3, ConnectionMode.MEMORY_STRICTLY);
-        assertThat(actual.size(), is(3));
-    }
-    
-    @Test
-    public void assertGetConnectionsWhenPartInCacheWithConnectionStrictlyMode() throws SQLException {
-        connection.getConnection("ds");
-        List<Connection> actual = connection.getConnections("ds", 3, ConnectionMode.CONNECTION_STRICTLY);
-        assertThat(actual.size(), is(3));
-    }
-    
-    @Test
-    public void assertGetConnectionsWhenConnectionCreateFailed() throws SQLException {
-        when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenThrow(new SQLException());
-        try {
-            connection.getConnections("ds", 3, ConnectionMode.CONNECTION_STRICTLY);
-        } catch (final SQLException ex) {
-            assertThat(ex.getMessage(), is("Can not get 3 connections one time, partition succeed connection(0) have released!"));
-        }
-    }
-    
-    @Test
     public void assertIsHoldTransaction() throws SQLException {
         connection.setAutoCommit(false);
         assertTrue(connection.isHoldTransaction());
@@ -146,7 +88,7 @@ public final class ShardingSphereConnectionTest {
     public void assertSetAutoCommitWithLocalTransaction() throws SQLException {
         Connection physicalConnection = mock(Connection.class);
         when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
-        connection.getConnection("ds");
+        connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
         connection.setAutoCommit(true);
         assertTrue(connection.getAutoCommit());
         verify(physicalConnection).setAutoCommit(true);
@@ -156,7 +98,7 @@ public final class ShardingSphereConnectionTest {
     public void assertSetAutoCommitWithDistributedTransaction() throws SQLException {
         ConnectionTransaction connectionTransaction = mock(ConnectionTransaction.class);
         when(connectionTransaction.getDistributedTransactionOperationType(true)).thenReturn(DistributedTransactionOperationType.COMMIT);
-        setConnectionTransaction(connectionTransaction);
+        mockConnectionManager(connectionTransaction);
         connection.setAutoCommit(true);
         assertTrue(connection.getAutoCommit());
         verify(connectionTransaction).commit();
@@ -166,7 +108,7 @@ public final class ShardingSphereConnectionTest {
     public void assertCommitWithLocalTransaction() throws SQLException {
         Connection physicalConnection = mock(Connection.class);
         when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
-        connection.getConnection("ds");
+        connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
         connection.setAutoCommit(false);
         assertFalse(connection.getAutoCommit());
         assertTrue(TransactionHolder.isTransaction());
@@ -180,21 +122,21 @@ public final class ShardingSphereConnectionTest {
     public void assertCommitWithDistributedTransaction() throws SQLException {
         ConnectionTransaction connectionTransaction = mock(ConnectionTransaction.class);
         when(connectionTransaction.getDistributedTransactionOperationType(false)).thenReturn(DistributedTransactionOperationType.BEGIN);
-        setConnectionTransaction(connectionTransaction);
+        final ConnectionManager connectionManager = mockConnectionManager(connectionTransaction);
         connection.setAutoCommit(false);
         assertFalse(connection.getAutoCommit());
         assertTrue(TransactionHolder.isTransaction());
         verify(connectionTransaction).begin();
         connection.commit();
         assertFalse(TransactionHolder.isTransaction());
-        verify(connectionTransaction).commit();
+        verify(connectionManager).commit();
     }
     
     @Test
     public void assertRollbackWithLocalTransaction() throws SQLException {
         Connection physicalConnection = mock(Connection.class);
         when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
-        connection.getConnection("ds");
+        connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
         connection.setAutoCommit(false);
         assertFalse(connection.getAutoCommit());
         connection.rollback();
@@ -205,21 +147,24 @@ public final class ShardingSphereConnectionTest {
     public void assertRollbackWithDistributedTransaction() throws SQLException {
         ConnectionTransaction connectionTransaction = mock(ConnectionTransaction.class);
         when(connectionTransaction.getDistributedTransactionOperationType(false)).thenReturn(DistributedTransactionOperationType.BEGIN);
-        setConnectionTransaction(connectionTransaction);
+        final ConnectionManager connectionManager = mockConnectionManager(connectionTransaction);
         connection.setAutoCommit(false);
         assertFalse(connection.getAutoCommit());
         assertTrue(TransactionHolder.isTransaction());
         verify(connectionTransaction).begin();
         connection.rollback();
         assertFalse(TransactionHolder.isTransaction());
-        verify(connectionTransaction).rollback();
+        verify(connectionManager).rollback();
     }
     
     @SneakyThrows(ReflectiveOperationException.class)
-    private void setConnectionTransaction(final ConnectionTransaction connectionTransaction) {
-        Field field = connection.getClass().getDeclaredField("connectionTransaction");
+    private ConnectionManager mockConnectionManager(final ConnectionTransaction connectionTransaction) {
+        Field field = connection.getClass().getDeclaredField("connectionManager");
         field.setAccessible(true);
-        field.set(connection, connectionTransaction);
+        ConnectionManager result = mock(ConnectionManager.class);
+        when(result.getConnectionTransaction()).thenReturn(connectionTransaction);
+        field.set(connection, result);
+        return result;
     }
     
     @Test
@@ -229,14 +174,14 @@ public final class ShardingSphereConnectionTest {
     
     @Test
     public void assertIsInvalid() throws SQLException {
-        connection.getConnection("ds");
+        connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
         assertFalse(connection.isValid(0));
     }
     
     @Test
     public void assertSetReadOnly() throws SQLException {
         assertFalse(connection.isReadOnly());
-        Connection physicalConnection = connection.getConnection("ds");
+        Connection physicalConnection = connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
         connection.setReadOnly(true);
         assertTrue(connection.isReadOnly());
         verify(physicalConnection).setReadOnly(true);
@@ -249,7 +194,7 @@ public final class ShardingSphereConnectionTest {
     
     @Test
     public void assertSetTransactionIsolation() throws SQLException {
-        Connection physicalConnection = connection.getConnection("ds");
+        Connection physicalConnection = connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
         connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         verify(physicalConnection).setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
     }
@@ -258,7 +203,7 @@ public final class ShardingSphereConnectionTest {
     public void assertCreateArrayOf() throws SQLException {
         Connection physicalConnection = mock(Connection.class);
         when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
-        connection.getConnection("ds");
+        connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
         assertNull(connection.createArrayOf("int", null));
         verify(physicalConnection).createArrayOf("int", null);
     }
@@ -267,14 +212,5 @@ public final class ShardingSphereConnectionTest {
     public void assertClose() throws SQLException {
         connection.close();
         assertTrue(connection.isClosed());
-        assertTrue(getCachedConnections().isEmpty());
-    }
-    
-    @SuppressWarnings("unchecked")
-    @SneakyThrows(ReflectiveOperationException.class)
-    private Multimap<String, Connection> getCachedConnections() {
-        Field field = ShardingSphereConnection.class.getDeclaredField("cachedConnections");
-        field.setAccessible(true);
-        return (Multimap<String, Connection>) field.get(connection);
     }
 }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
index 41c7913..dd03929 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
@@ -21,6 +21,7 @@ import com.zaxxer.hikari.HikariDataSource;
 import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
 import org.junit.After;
@@ -33,7 +34,6 @@ import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
@@ -79,15 +79,10 @@ public final class ShardingSphereDataSourceTest {
     }
     
     @Test
-    public void assertGetConnection() throws SQLException {
-        DataSource dataSource = mockDataSource();
-        assertThat(((ShardingSphereConnection) createShardingSphereDataSource(dataSource).getConnection()).getConnection("ds"), is(dataSource.getConnection()));
-    }
-    
-    @Test
     public void assertGetConnectionWithUsernameAndPassword() throws SQLException {
         DataSource dataSource = mockDataSource();
-        assertThat(((ShardingSphereConnection) createShardingSphereDataSource(dataSource).getConnection("", "")).getConnection("ds"), is(dataSource.getConnection()));
+        assertThat(((ShardingSphereConnection) createShardingSphereDataSource(dataSource).getConnection("", "")).getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0),
+                is(dataSource.getConnection()));
     }
     
     private DataSource mockDataSource() throws SQLException {
@@ -152,7 +147,7 @@ public final class ShardingSphereDataSourceTest {
     @Test
     public void assertCloseWithDataSourceNames() throws Exception {
         ShardingSphereDataSource actual = createShardingSphereDataSource(createHikariDataSource());
-        actual.close(Arrays.asList("ds"));
+        actual.close(Collections.singleton("ds"));
         Map<String, DataSource> dataSourceMap = actual.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME);
         assertThat(((HikariDataSource) dataSourceMap.get("ds")).isClosed(), is(true));
     }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java
index 1c3363e..00d4899 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java
@@ -48,7 +48,6 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -89,8 +88,8 @@ public final class ShardingSphereDatabaseMetaDataTest {
         when(connection.getMetaData()).thenReturn(databaseMetaData);
         when(resultSet.getMetaData()).thenReturn(mock(ResultSetMetaData.class));
         CachedDatabaseMetaData cachedDatabaseMetaData = new CachedDatabaseMetaData(databaseMetaData);
-        when(shardingSphereConnection.getRandomPhysicalDataSourceName()).thenReturn(DATA_SOURCE_NAME);
-        when(shardingSphereConnection.getConnection(anyString())).thenReturn(connection);
+        when(shardingSphereConnection.getConnectionManager().getRandomPhysicalDataSourceName()).thenReturn(DATA_SOURCE_NAME);
+        when(shardingSphereConnection.getConnectionManager().getRandomConnection()).thenReturn(connection);
         when(shardingSphereConnection.getContextManager().getMetaDataContexts()).thenReturn(metaDataContexts);
         when(shardingSphereConnection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
         when(shardingSphereConnection.getSchema()).thenReturn(DefaultSchema.LOGIC_NAME);