You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2021/10/16 14:34:30 UTC
[shardingsphere] branch master updated: Add ConnectionManager
(#13072)
This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 87381e4 Add ConnectionManager (#13072)
87381e4 is described below
commit 87381e4e59108123694af8e4fb4e43ab8f0bdc92
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Oct 16 22:33:39 2021 +0800
Add ConnectionManager (#13072)
* Add ConnectionManager
* Add test cases of ConnectionManager
* Add more test cases of ConnectionManager
* Add more test cases of ConnectionManager
* Add super interface ExecutorJDBCManager with ConnectionManager
* Add super interface ExecutorJDBCManager with ConnectionManager
* Refactor ConnectionManager
* Refactor ConnectionManager
---
...phereConnection.java => ConnectionManager.java} | 310 +++++++--------------
.../core/connection/ShardingSphereConnection.java | 213 ++------------
.../metadata/ShardingSphereDatabaseMetaData.java | 9 +-
.../statement/ShardingSpherePreparedStatement.java | 4 +-
.../core/statement/ShardingSphereStatement.java | 2 +-
.../core/connection/ConnectionManagerTest.java | 123 ++++++++
.../connection/ShardingSphereConnectionTest.java | 100 ++-----
.../datasource/ShardingSphereDataSourceTest.java | 13 +-
.../ShardingSphereDatabaseMetaDataTest.java | 5 +-
9 files changed, 289 insertions(+), 490 deletions(-)
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
similarity index 55%
copy from shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
copy to shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
index 891a3cf..35ac80e 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
@@ -21,25 +21,18 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import lombok.Getter;
-import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
-import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
-import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
-import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
+import org.apache.shardingsphere.driver.jdbc.adapter.invocation.MethodInvocationRecorder;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.readwritesplitting.route.impl.PrimaryVisitedManager;
import org.apache.shardingsphere.transaction.ConnectionTransaction;
-import org.apache.shardingsphere.transaction.TransactionHolder;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
import javax.sql.DataSource;
import java.security.SecureRandom;
-import java.sql.Array;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
@@ -47,39 +40,30 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Random;
/**
- * ShardingSphere Connection.
+ * Connection manager.
*/
-public final class ShardingSphereConnection extends AbstractConnectionAdapter implements ExecutorJDBCManager {
+public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseable {
- @Getter
- private final String schema;
+ private final Map<String, DataSource> dataSourceMap;
@Getter
- private final ContextManager contextManager;
+ private final ConnectionTransaction connectionTransaction;
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
- private final ConnectionTransaction connectionTransaction;
+ private final MethodInvocationRecorder methodInvocationRecorder = new MethodInvocationRecorder();
private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
private final Random random = new SecureRandom();
- private boolean autoCommit = true;
-
- private boolean readOnly;
-
- private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
-
- private volatile boolean closed;
-
- public ShardingSphereConnection(final String schema, final ContextManager contextManager) {
- this.schema = schema;
- this.contextManager = contextManager;
+ public ConnectionManager(final String schema, final ContextManager contextManager) {
+ dataSourceMap = contextManager.getDataSourceMap(schema);
connectionTransaction = createConnectionTransaction(schema, contextManager);
}
@@ -90,29 +74,113 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
}
/**
+ * Set auto commit.
+ *
+ * @param autoCommit auto commit
+ * @throws SQLException SQL exception
+ */
+ public void setAutoCommit(final boolean autoCommit) throws SQLException {
+ methodInvocationRecorder.record(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
+ forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
+ }
+
+ /**
+ * Commit.
+ *
+ * @throws SQLException SQL exception
+ */
+ public void commit() throws SQLException {
+ if (connectionTransaction.isLocalTransaction()) {
+ forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
+ } else {
+ connectionTransaction.commit();
+ }
+ }
+
+ /**
+ * Rollback.
+ *
+ * @throws SQLException SQL exception
+ */
+ public void rollback() throws SQLException {
+ if (connectionTransaction.isLocalTransaction()) {
+ forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
+ } else {
+ connectionTransaction.rollback();
+ }
+ }
+
+ /**
+ * Get transaction isolation.
+ *
+ * @return transaction isolation level
+ * @throws SQLException SQL exception
+ */
+ public Optional<Integer> getTransactionIsolation() throws SQLException {
+ return cachedConnections.values().isEmpty() ? Optional.empty() : Optional.of(cachedConnections.values().iterator().next().getTransactionIsolation());
+ }
+
+ /**
+ * Set transaction isolation.
+ *
+ * @param level transaction isolation level
+ * @throws SQLException SQL exception
+ */
+ public void setTransactionIsolation(final int level) throws SQLException {
+ methodInvocationRecorder.record(Connection.class, "setTransactionIsolation", new Class[]{int.class}, new Object[]{level});
+ forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setTransactionIsolation(level));
+ }
+
+ /**
+ * Set read only.
+ *
+ * @param readOnly read only
+ * @throws SQLException SQL exception
+ */
+ public void setReadOnly(final boolean readOnly) throws SQLException {
+ methodInvocationRecorder.record(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
+ forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
+ }
+
+ /**
+ * Whether connection valid.
+ *
+ * @param timeout timeout
+ * @return connection valid or not
+ * @throws SQLException SQL exception
+ */
+ public boolean isValid(final int timeout) throws SQLException {
+ for (Connection each : cachedConnections.values()) {
+ if (!each.isValid(timeout)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Get random physical data source name.
*
* @return random physical data source name
*/
public String getRandomPhysicalDataSourceName() {
- Collection<String> datasourceNames = cachedConnections.isEmpty() ? contextManager.getDataSourceMap(schema).keySet() : cachedConnections.keySet();
+ Collection<String> datasourceNames = cachedConnections.isEmpty() ? dataSourceMap.keySet() : cachedConnections.keySet();
return new ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));
}
/**
- * Get connection.
+ * Get random connection.
*
- * @param dataSourceName data source name
- * @return connection
+ * @return random connection
* @throws SQLException SQL exception
*/
- public Connection getConnection(final String dataSourceName) throws SQLException {
- return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
+ public Connection getRandomConnection() throws SQLException {
+ return getConnections(getRandomPhysicalDataSourceName(), 1, ConnectionMode.MEMORY_STRICTLY).get(0);
}
@Override
public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
- DataSource dataSource = contextManager.getDataSourceMap(schema).get(dataSourceName);
+ DataSource dataSource = dataSourceMap.get(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
Collection<Connection> connections;
synchronized (cachedConnections) {
@@ -142,7 +210,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
if (1 == connectionSize) {
Connection connection = createConnection(dataSourceName, dataSource);
- getMethodInvocationRecorder().replay(connection);
+ methodInvocationRecorder.replay(connection);
return Collections.singletonList(connection);
}
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
@@ -158,7 +226,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
for (int i = 0; i < connectionSize; i++) {
try {
Connection connection = createConnection(dataSourceName, dataSource);
- getMethodInvocationRecorder().replay(connection);
+ methodInvocationRecorder.replay(connection);
result.add(connection);
} catch (final SQLException ex) {
for (Connection each : result) {
@@ -175,15 +243,6 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
return connectionInTransaction.isPresent() ? connectionInTransaction.get() : dataSource.getConnection();
}
- /**
- * Whether hold transaction or not.
- *
- * @return true or false
- */
- public boolean isHoldTransaction() {
- return connectionTransaction.isHoldTransaction(autoCommit);
- }
-
@SuppressWarnings("MagicConstant")
@Override
public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
@@ -199,172 +258,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
}
@Override
- public DatabaseMetaData getMetaData() {
- return new ShardingSphereDatabaseMetaData(this);
- }
-
- @Override
- public PreparedStatement prepareStatement(final String sql) throws SQLException {
- return new ShardingSpherePreparedStatement(this, sql);
- }
-
- @Override
- public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
- return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency);
- }
-
- @Override
- public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
- return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
- }
-
- @Override
- public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
- return new ShardingSpherePreparedStatement(this, sql, autoGeneratedKeys);
- }
-
- @Override
- public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
- return new ShardingSpherePreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);
- }
-
- @Override
- public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
- return new ShardingSpherePreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);
- }
-
- @Override
- public Statement createStatement() {
- return new ShardingSphereStatement(this);
- }
-
- @Override
- public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
- return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency);
- }
-
- @Override
- public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
- return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
- }
-
- @Override
- public boolean getAutoCommit() {
- return autoCommit;
- }
-
- @Override
- public void setAutoCommit(final boolean autoCommit) throws SQLException {
- this.autoCommit = autoCommit;
- if (connectionTransaction.isLocalTransaction()) {
- processLocalTransaction();
- } else {
- processDistributeTransaction();
- }
- }
-
- private void processLocalTransaction() throws SQLException {
- getMethodInvocationRecorder().record(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
- forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
- if (!autoCommit) {
- TransactionHolder.setInTransaction();
- }
- }
-
- private void processDistributeTransaction() throws SQLException {
- switch (connectionTransaction.getDistributedTransactionOperationType(autoCommit)) {
- case BEGIN:
- closeCachedConnections();
- connectionTransaction.begin();
- TransactionHolder.setInTransaction();
- break;
- case COMMIT:
- connectionTransaction.commit();
- break;
- default:
- break;
- }
- }
-
- private void closeCachedConnections() throws SQLException {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
- cachedConnections.clear();
- }
-
- @Override
- public void commit() throws SQLException {
- try {
- if (connectionTransaction.isLocalTransaction()) {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
- } else {
- connectionTransaction.commit();
- }
- } finally {
- TransactionHolder.clear();
- }
- }
-
- @Override
- public void rollback() throws SQLException {
- try {
- if (connectionTransaction.isLocalTransaction()) {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
- } else {
- connectionTransaction.rollback();
- }
- } finally {
- TransactionHolder.clear();
- }
- }
-
- @Override
- public boolean isReadOnly() {
- return readOnly;
- }
-
- @Override
- public void setReadOnly(final boolean readOnly) throws SQLException {
- this.readOnly = readOnly;
- getMethodInvocationRecorder().record(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
- forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
- }
-
- @Override
- public int getTransactionIsolation() throws SQLException {
- return cachedConnections.values().isEmpty() ? transactionIsolation : cachedConnections.values().iterator().next().getTransactionIsolation();
- }
-
- @Override
- public void setTransactionIsolation(final int level) throws SQLException {
- transactionIsolation = level;
- getMethodInvocationRecorder().record(Connection.class, "setTransactionIsolation", new Class[]{int.class}, new Object[]{level});
- forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setTransactionIsolation(level));
- }
-
- @Override
- public boolean isValid(final int timeout) throws SQLException {
- for (Connection connection : cachedConnections.values()) {
- if (!connection.isValid(timeout)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Array createArrayOf(final String typeName, final Object[] elements) throws SQLException {
- return getConnection(getRandomPhysicalDataSourceName()).createArrayOf(typeName, elements);
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
public void close() throws SQLException {
- closed = true;
- PrimaryVisitedManager.clear();
try {
forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
} finally {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 891a3cf..d78f2eb 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -17,43 +17,25 @@
package org.apache.shardingsphere.driver.jdbc.core.connection;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Multimap;
import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
-import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.readwritesplitting.route.impl.PrimaryVisitedManager;
-import org.apache.shardingsphere.transaction.ConnectionTransaction;
import org.apache.shardingsphere.transaction.TransactionHolder;
-import org.apache.shardingsphere.transaction.rule.TransactionRule;
-import javax.sql.DataSource;
-import java.security.SecureRandom;
import java.sql.Array;
-import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
/**
- * ShardingSphere Connection.
+ * ShardingSphere connection.
*/
-public final class ShardingSphereConnection extends AbstractConnectionAdapter implements ExecutorJDBCManager {
+public final class ShardingSphereConnection extends AbstractConnectionAdapter {
@Getter
private final String schema;
@@ -61,118 +43,21 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
@Getter
private final ContextManager contextManager;
- private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
-
- private final ConnectionTransaction connectionTransaction;
-
- private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
-
- private final Random random = new SecureRandom();
+ @Getter
+ private final ConnectionManager connectionManager;
private boolean autoCommit = true;
- private boolean readOnly;
-
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
+ private boolean readOnly;
+
private volatile boolean closed;
public ShardingSphereConnection(final String schema, final ContextManager contextManager) {
this.schema = schema;
this.contextManager = contextManager;
- connectionTransaction = createConnectionTransaction(schema, contextManager);
- }
-
- private ConnectionTransaction createConnectionTransaction(final String schemaName, final ContextManager contextManager) {
- Optional<TransactionRule> transactionRule = contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class);
- return transactionRule.map(optional -> new ConnectionTransaction(schemaName, optional, contextManager.getTransactionContexts()))
- .orElseGet(() -> new ConnectionTransaction(schemaName, contextManager.getTransactionContexts()));
- }
-
- /**
- * Get random physical data source name.
- *
- * @return random physical data source name
- */
- public String getRandomPhysicalDataSourceName() {
- Collection<String> datasourceNames = cachedConnections.isEmpty() ? contextManager.getDataSourceMap(schema).keySet() : cachedConnections.keySet();
- return new ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));
- }
-
- /**
- * Get connection.
- *
- * @param dataSourceName data source name
- * @return connection
- * @throws SQLException SQL exception
- */
- public Connection getConnection(final String dataSourceName) throws SQLException {
- return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
- }
-
- @Override
- public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
- DataSource dataSource = contextManager.getDataSourceMap(schema).get(dataSourceName);
- Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
- Collection<Connection> connections;
- synchronized (cachedConnections) {
- connections = cachedConnections.get(dataSourceName);
- }
- List<Connection> result;
- if (connections.size() >= connectionSize) {
- result = new ArrayList<>(connections).subList(0, connectionSize);
- } else if (!connections.isEmpty()) {
- result = new ArrayList<>(connectionSize);
- result.addAll(connections);
- List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);
- result.addAll(newConnections);
- synchronized (cachedConnections) {
- cachedConnections.putAll(dataSourceName, newConnections);
- }
- } else {
- result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));
- synchronized (cachedConnections) {
- cachedConnections.putAll(dataSourceName, result);
- }
- }
- return result;
- }
-
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
- if (1 == connectionSize) {
- Connection connection = createConnection(dataSourceName, dataSource);
- getMethodInvocationRecorder().replay(connection);
- return Collections.singletonList(connection);
- }
- if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
- return createConnections(dataSourceName, dataSource, connectionSize);
- }
- synchronized (dataSource) {
- return createConnections(dataSourceName, dataSource, connectionSize);
- }
- }
-
- private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
- List<Connection> result = new ArrayList<>(connectionSize);
- for (int i = 0; i < connectionSize; i++) {
- try {
- Connection connection = createConnection(dataSourceName, dataSource);
- getMethodInvocationRecorder().replay(connection);
- result.add(connection);
- } catch (final SQLException ex) {
- for (Connection each : result) {
- each.close();
- }
- throw new SQLException(String.format("Can not get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
- }
- }
- return result;
- }
-
- private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
- Optional<Connection> connectionInTransaction = connectionTransaction.getConnection(dataSourceName);
- return connectionInTransaction.isPresent() ? connectionInTransaction.get() : dataSource.getConnection();
+ connectionManager = new ConnectionManager(schema, contextManager);
}
/**
@@ -181,21 +66,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
* @return true or false
*/
public boolean isHoldTransaction() {
- return connectionTransaction.isHoldTransaction(autoCommit);
- }
-
- @SuppressWarnings("MagicConstant")
- @Override
- public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
- return connection.createStatement(option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
- }
-
- @SuppressWarnings("MagicConstant")
- @Override
- public PreparedStatement createStorageResource(final String sql, final List<Object> parameters,
- final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
- return option.isReturnGeneratedKeys() ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
- : connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
+ return connectionManager.getConnectionTransaction().isHoldTransaction(autoCommit);
}
@Override
@@ -256,7 +127,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
@Override
public void setAutoCommit(final boolean autoCommit) throws SQLException {
this.autoCommit = autoCommit;
- if (connectionTransaction.isLocalTransaction()) {
+ if (connectionManager.getConnectionTransaction().isLocalTransaction()) {
processLocalTransaction();
} else {
processDistributeTransaction();
@@ -264,41 +135,31 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
}
private void processLocalTransaction() throws SQLException {
- getMethodInvocationRecorder().record(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
- forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
+ connectionManager.setAutoCommit(autoCommit);
if (!autoCommit) {
TransactionHolder.setInTransaction();
}
}
private void processDistributeTransaction() throws SQLException {
- switch (connectionTransaction.getDistributedTransactionOperationType(autoCommit)) {
+ switch (connectionManager.getConnectionTransaction().getDistributedTransactionOperationType(autoCommit)) {
case BEGIN:
- closeCachedConnections();
- connectionTransaction.begin();
+ connectionManager.close();
+ connectionManager.getConnectionTransaction().begin();
TransactionHolder.setInTransaction();
break;
case COMMIT:
- connectionTransaction.commit();
+ connectionManager.getConnectionTransaction().commit();
break;
default:
break;
}
}
- private void closeCachedConnections() throws SQLException {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
- cachedConnections.clear();
- }
-
@Override
public void commit() throws SQLException {
try {
- if (connectionTransaction.isLocalTransaction()) {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
- } else {
- connectionTransaction.commit();
- }
+ connectionManager.commit();
} finally {
TransactionHolder.clear();
}
@@ -307,53 +168,43 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
@Override
public void rollback() throws SQLException {
try {
- if (connectionTransaction.isLocalTransaction()) {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
- } else {
- connectionTransaction.rollback();
- }
+ connectionManager.rollback();
} finally {
TransactionHolder.clear();
}
}
+ @SuppressWarnings("MagicConstant")
@Override
- public boolean isReadOnly() {
- return readOnly;
+ public int getTransactionIsolation() throws SQLException {
+ return connectionManager.getTransactionIsolation().orElseGet(() -> transactionIsolation);
}
@Override
- public void setReadOnly(final boolean readOnly) throws SQLException {
- this.readOnly = readOnly;
- getMethodInvocationRecorder().record(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
- forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
+ public void setTransactionIsolation(final int level) throws SQLException {
+ transactionIsolation = level;
+ connectionManager.setTransactionIsolation(level);
}
@Override
- public int getTransactionIsolation() throws SQLException {
- return cachedConnections.values().isEmpty() ? transactionIsolation : cachedConnections.values().iterator().next().getTransactionIsolation();
+ public boolean isReadOnly() {
+ return readOnly;
}
@Override
- public void setTransactionIsolation(final int level) throws SQLException {
- transactionIsolation = level;
- getMethodInvocationRecorder().record(Connection.class, "setTransactionIsolation", new Class[]{int.class}, new Object[]{level});
- forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setTransactionIsolation(level));
+ public void setReadOnly(final boolean readOnly) throws SQLException {
+ this.readOnly = readOnly;
+ connectionManager.setReadOnly(readOnly);
}
@Override
public boolean isValid(final int timeout) throws SQLException {
- for (Connection connection : cachedConnections.values()) {
- if (!connection.isValid(timeout)) {
- return false;
- }
- }
- return true;
+ return connectionManager.isValid(timeout);
}
@Override
public Array createArrayOf(final String typeName, final Object[] elements) throws SQLException {
- return getConnection(getRandomPhysicalDataSourceName()).createArrayOf(typeName, elements);
+ return connectionManager.getRandomConnection().createArrayOf(typeName, elements);
}
@Override
@@ -365,10 +216,6 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter im
public void close() throws SQLException {
closed = true;
PrimaryVisitedManager.clear();
- try {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);
- } finally {
- cachedConnections.clear();
- }
+ connectionManager.close();
}
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
index 2d4c39c..4c76414 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
@@ -46,6 +46,8 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
private String currentPhysicalDataSourceName;
+ private Connection currentPhysicalConnection;
+
private DatabaseMetaData currentDatabaseMetaData;
public ShardingSphereDatabaseMetaData(final ShardingSphereConnection connection) {
@@ -57,7 +59,10 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
@Override
public Connection getConnection() throws SQLException {
- return connection.getConnection(getDataSourceName());
+ if (null == currentPhysicalConnection) {
+ currentPhysicalConnection = connection.getConnectionManager().getRandomConnection();
+ }
+ return currentPhysicalConnection;
}
@Override
@@ -231,7 +236,7 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
private String getDataSourceName() {
if (null == currentPhysicalDataSourceName) {
- currentPhysicalDataSourceName = connection.getRandomPhysicalDataSourceName();
+ currentPhysicalDataSourceName = connection.getConnectionManager().getRandomPhysicalDataSourceName();
}
return currentPhysicalDataSourceName;
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 4f7fbb2..0ee626a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -225,7 +225,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection,
+ return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(),
statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
}
@@ -466,7 +466,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
private void initBatchPreparedStatementExecutor() throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
JDBCDriverType.PREPARED_STATEMENT, metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
- connection, statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
+ connection.getConnectionManager(), statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
batchPreparedStatementExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
new ArrayList<>(batchPreparedStatementExecutor.getBatchExecutionUnits()).stream().map(BatchExecutionUnit::getExecutionUnit).collect(Collectors.toList())));
setBatchParametersForStatements();
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index ee5b485..7f681d2 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -174,7 +174,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection,
+ return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(),
statementOption, metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
new file mode 100644
index 0000000..421f71a
--- /dev/null
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.jdbc.core.connection;
+
+import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class ConnectionManagerTest {
+
+ private ConnectionManager connectionManager;
+
+ @Before
+ public void setUp() throws SQLException {
+ connectionManager = new ConnectionManager(DefaultSchema.LOGIC_NAME, mockContextManager());
+ }
+
+ private ContextManager mockContextManager() throws SQLException {
+ ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
+ Map<String, DataSource> dataSourceMap = mockDataSourceMap();
+ when(result.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+ when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+ return result;
+ }
+
+ private Map<String, DataSource> mockDataSourceMap() throws SQLException {
+ Map<String, DataSource> result = new HashMap<>(2, 1);
+ result.put("ds", mock(DataSource.class, RETURNS_DEEP_STUBS));
+ DataSource invalidDataSource = mock(DataSource.class);
+ when(invalidDataSource.getConnection()).thenThrow(new SQLException());
+ result.put("invalid_ds", invalidDataSource);
+ return result;
+ }
+
+ @Test
+ public void assertGetRandomPhysicalDataSourceNameFromContextManager() throws SQLException {
+ connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+ String actual = connectionManager.getRandomPhysicalDataSourceName();
+ assertThat(actual, is("ds"));
+ }
+
+ @Test
+ public void assertGetRandomPhysicalDataSourceNameFromCache() throws SQLException {
+ connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+ String actual = connectionManager.getRandomPhysicalDataSourceName();
+ assertThat(actual, is("ds"));
+ }
+
+ @Test
+ public void assertGetConnection() throws SQLException {
+ assertThat(connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY),
+ is(connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY)));
+ }
+
+ @Test
+ public void assertGetConnectionsWhenAllInCache() throws SQLException {
+ Connection expected = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
+ List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.CONNECTION_STRICTLY);
+ assertThat(actual.size(), is(1));
+ assertThat(actual.get(0), is(expected));
+ }
+
+ @Test
+ public void assertGetConnectionsWhenEmptyCache() throws SQLException {
+ List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+ assertThat(actual.size(), is(1));
+ }
+
+ @Test
+ public void assertGetConnectionsWhenPartInCacheWithMemoryStrictlyMode() throws SQLException {
+ connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+ List<Connection> actual = connectionManager.getConnections("ds", 3, ConnectionMode.MEMORY_STRICTLY);
+ assertThat(actual.size(), is(3));
+ }
+
+ @Test
+ public void assertGetConnectionsWhenPartInCacheWithConnectionStrictlyMode() throws SQLException {
+ connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+ List<Connection> actual = connectionManager.getConnections("ds", 3, ConnectionMode.CONNECTION_STRICTLY);
+ assertThat(actual.size(), is(3));
+ }
+
+ @Test
+ public void assertGetConnectionsWhenConnectionCreateFailed() {
+ try {
+ connectionManager.getConnections("invalid_ds", 3, ConnectionMode.CONNECTION_STRICTLY);
+ } catch (final SQLException ex) {
+ assertThat(ex.getMessage(), is("Can not get 3 connections one time, partition succeed connection(0) have released!"));
+ }
+ }
+}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
index 3087efd..897fd74 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.driver.jdbc.core.connection;
-import com.google.common.collect.Multimap;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -36,7 +35,6 @@ import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.List;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
@@ -75,62 +73,6 @@ public final class ShardingSphereConnectionTest {
}
@Test
- public void assertGetRandomPhysicalDataSourceNameFromContextManager() {
- String actual = connection.getRandomPhysicalDataSourceName();
- assertThat(actual, is("ds"));
- }
-
- @Test
- public void assertGetRandomPhysicalDataSourceNameFromCache() throws SQLException {
- connection.getConnection("ds");
- String actual = connection.getRandomPhysicalDataSourceName();
- assertThat(actual, is("ds"));
- }
-
- @Test
- public void assertGetConnection() throws SQLException {
- assertThat(connection.getConnection("ds"), is(connection.getConnection("ds")));
- }
-
- @Test
- public void assertGetConnectionsWhenAllInCache() throws SQLException {
- Connection expected = connection.getConnection("ds");
- List<Connection> actual = connection.getConnections("ds", 1, ConnectionMode.CONNECTION_STRICTLY);
- assertThat(actual.size(), is(1));
- assertThat(actual.get(0), is(expected));
- }
-
- @Test
- public void assertGetConnectionsWhenEmptyCache() throws SQLException {
- List<Connection> actual = connection.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
- assertThat(actual.size(), is(1));
- }
-
- @Test
- public void assertGetConnectionsWhenPartInCacheWithMemoryStrictlyMode() throws SQLException {
- connection.getConnection("ds");
- List<Connection> actual = connection.getConnections("ds", 3, ConnectionMode.MEMORY_STRICTLY);
- assertThat(actual.size(), is(3));
- }
-
- @Test
- public void assertGetConnectionsWhenPartInCacheWithConnectionStrictlyMode() throws SQLException {
- connection.getConnection("ds");
- List<Connection> actual = connection.getConnections("ds", 3, ConnectionMode.CONNECTION_STRICTLY);
- assertThat(actual.size(), is(3));
- }
-
- @Test
- public void assertGetConnectionsWhenConnectionCreateFailed() throws SQLException {
- when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenThrow(new SQLException());
- try {
- connection.getConnections("ds", 3, ConnectionMode.CONNECTION_STRICTLY);
- } catch (final SQLException ex) {
- assertThat(ex.getMessage(), is("Can not get 3 connections one time, partition succeed connection(0) have released!"));
- }
- }
-
- @Test
public void assertIsHoldTransaction() throws SQLException {
connection.setAutoCommit(false);
assertTrue(connection.isHoldTransaction());
@@ -146,7 +88,7 @@ public final class ShardingSphereConnectionTest {
public void assertSetAutoCommitWithLocalTransaction() throws SQLException {
Connection physicalConnection = mock(Connection.class);
when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
- connection.getConnection("ds");
+ connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
connection.setAutoCommit(true);
assertTrue(connection.getAutoCommit());
verify(physicalConnection).setAutoCommit(true);
@@ -156,7 +98,7 @@ public final class ShardingSphereConnectionTest {
public void assertSetAutoCommitWithDistributedTransaction() throws SQLException {
ConnectionTransaction connectionTransaction = mock(ConnectionTransaction.class);
when(connectionTransaction.getDistributedTransactionOperationType(true)).thenReturn(DistributedTransactionOperationType.COMMIT);
- setConnectionTransaction(connectionTransaction);
+ mockConnectionManager(connectionTransaction);
connection.setAutoCommit(true);
assertTrue(connection.getAutoCommit());
verify(connectionTransaction).commit();
@@ -166,7 +108,7 @@ public final class ShardingSphereConnectionTest {
public void assertCommitWithLocalTransaction() throws SQLException {
Connection physicalConnection = mock(Connection.class);
when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
- connection.getConnection("ds");
+ connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
connection.setAutoCommit(false);
assertFalse(connection.getAutoCommit());
assertTrue(TransactionHolder.isTransaction());
@@ -180,21 +122,21 @@ public final class ShardingSphereConnectionTest {
public void assertCommitWithDistributedTransaction() throws SQLException {
ConnectionTransaction connectionTransaction = mock(ConnectionTransaction.class);
when(connectionTransaction.getDistributedTransactionOperationType(false)).thenReturn(DistributedTransactionOperationType.BEGIN);
- setConnectionTransaction(connectionTransaction);
+ final ConnectionManager connectionManager = mockConnectionManager(connectionTransaction);
connection.setAutoCommit(false);
assertFalse(connection.getAutoCommit());
assertTrue(TransactionHolder.isTransaction());
verify(connectionTransaction).begin();
connection.commit();
assertFalse(TransactionHolder.isTransaction());
- verify(connectionTransaction).commit();
+ verify(connectionManager).commit();
}
@Test
public void assertRollbackWithLocalTransaction() throws SQLException {
Connection physicalConnection = mock(Connection.class);
when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
- connection.getConnection("ds");
+ connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
connection.setAutoCommit(false);
assertFalse(connection.getAutoCommit());
connection.rollback();
@@ -205,21 +147,24 @@ public final class ShardingSphereConnectionTest {
public void assertRollbackWithDistributedTransaction() throws SQLException {
ConnectionTransaction connectionTransaction = mock(ConnectionTransaction.class);
when(connectionTransaction.getDistributedTransactionOperationType(false)).thenReturn(DistributedTransactionOperationType.BEGIN);
- setConnectionTransaction(connectionTransaction);
+ final ConnectionManager connectionManager = mockConnectionManager(connectionTransaction);
connection.setAutoCommit(false);
assertFalse(connection.getAutoCommit());
assertTrue(TransactionHolder.isTransaction());
verify(connectionTransaction).begin();
connection.rollback();
assertFalse(TransactionHolder.isTransaction());
- verify(connectionTransaction).rollback();
+ verify(connectionManager).rollback();
}
@SneakyThrows(ReflectiveOperationException.class)
- private void setConnectionTransaction(final ConnectionTransaction connectionTransaction) {
- Field field = connection.getClass().getDeclaredField("connectionTransaction");
+ private ConnectionManager mockConnectionManager(final ConnectionTransaction connectionTransaction) {
+ Field field = connection.getClass().getDeclaredField("connectionManager");
field.setAccessible(true);
- field.set(connection, connectionTransaction);
+ ConnectionManager result = mock(ConnectionManager.class);
+ when(result.getConnectionTransaction()).thenReturn(connectionTransaction);
+ field.set(connection, result);
+ return result;
}
@Test
@@ -229,14 +174,14 @@ public final class ShardingSphereConnectionTest {
@Test
public void assertIsInvalid() throws SQLException {
- connection.getConnection("ds");
+ connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
assertFalse(connection.isValid(0));
}
@Test
public void assertSetReadOnly() throws SQLException {
assertFalse(connection.isReadOnly());
- Connection physicalConnection = connection.getConnection("ds");
+ Connection physicalConnection = connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
connection.setReadOnly(true);
assertTrue(connection.isReadOnly());
verify(physicalConnection).setReadOnly(true);
@@ -249,7 +194,7 @@ public final class ShardingSphereConnectionTest {
@Test
public void assertSetTransactionIsolation() throws SQLException {
- Connection physicalConnection = connection.getConnection("ds");
+ Connection physicalConnection = connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
verify(physicalConnection).setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
}
@@ -258,7 +203,7 @@ public final class ShardingSphereConnectionTest {
public void assertCreateArrayOf() throws SQLException {
Connection physicalConnection = mock(Connection.class);
when(connection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds").getConnection()).thenReturn(physicalConnection);
- connection.getConnection("ds");
+ connection.getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
assertNull(connection.createArrayOf("int", null));
verify(physicalConnection).createArrayOf("int", null);
}
@@ -267,14 +212,5 @@ public final class ShardingSphereConnectionTest {
public void assertClose() throws SQLException {
connection.close();
assertTrue(connection.isClosed());
- assertTrue(getCachedConnections().isEmpty());
- }
-
- @SuppressWarnings("unchecked")
- @SneakyThrows(ReflectiveOperationException.class)
- private Multimap<String, Connection> getCachedConnections() {
- Field field = ShardingSphereConnection.class.getDeclaredField("cachedConnections");
- field.setAccessible(true);
- return (Multimap<String, Connection>) field.get(connection);
}
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
index 41c7913..dd03929 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
@@ -21,6 +21,7 @@ import com.zaxxer.hikari.HikariDataSource;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
import org.junit.After;
@@ -33,7 +34,6 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
@@ -79,15 +79,10 @@ public final class ShardingSphereDataSourceTest {
}
@Test
- public void assertGetConnection() throws SQLException {
- DataSource dataSource = mockDataSource();
- assertThat(((ShardingSphereConnection) createShardingSphereDataSource(dataSource).getConnection()).getConnection("ds"), is(dataSource.getConnection()));
- }
-
- @Test
public void assertGetConnectionWithUsernameAndPassword() throws SQLException {
DataSource dataSource = mockDataSource();
- assertThat(((ShardingSphereConnection) createShardingSphereDataSource(dataSource).getConnection("", "")).getConnection("ds"), is(dataSource.getConnection()));
+ assertThat(((ShardingSphereConnection) createShardingSphereDataSource(dataSource).getConnection("", "")).getConnectionManager().getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0),
+ is(dataSource.getConnection()));
}
private DataSource mockDataSource() throws SQLException {
@@ -152,7 +147,7 @@ public final class ShardingSphereDataSourceTest {
@Test
public void assertCloseWithDataSourceNames() throws Exception {
ShardingSphereDataSource actual = createShardingSphereDataSource(createHikariDataSource());
- actual.close(Arrays.asList("ds"));
+ actual.close(Collections.singleton("ds"));
Map<String, DataSource> dataSourceMap = actual.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME);
assertThat(((HikariDataSource) dataSourceMap.get("ds")).isClosed(), is(true));
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java
index 1c3363e..00d4899 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaDataTest.java
@@ -48,7 +48,6 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -89,8 +88,8 @@ public final class ShardingSphereDatabaseMetaDataTest {
when(connection.getMetaData()).thenReturn(databaseMetaData);
when(resultSet.getMetaData()).thenReturn(mock(ResultSetMetaData.class));
CachedDatabaseMetaData cachedDatabaseMetaData = new CachedDatabaseMetaData(databaseMetaData);
- when(shardingSphereConnection.getRandomPhysicalDataSourceName()).thenReturn(DATA_SOURCE_NAME);
- when(shardingSphereConnection.getConnection(anyString())).thenReturn(connection);
+ when(shardingSphereConnection.getConnectionManager().getRandomPhysicalDataSourceName()).thenReturn(DATA_SOURCE_NAME);
+ when(shardingSphereConnection.getConnectionManager().getRandomConnection()).thenReturn(connection);
when(shardingSphereConnection.getContextManager().getMetaDataContexts()).thenReturn(metaDataContexts);
when(shardingSphereConnection.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
when(shardingSphereConnection.getSchema()).thenReturn(DefaultSchema.LOGIC_NAME);