You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by xi...@apache.org on 2020/11/21 17:32:17 UTC
[shardingsphere] branch master updated: Move ExecutorJDBCManager
impl to ShardingSphereConnection (#8277)
This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a07d5c4 Move ExecutorJDBCManager impl to ShardingSphereConnection (#8277)
a07d5c4 is described below
commit a07d5c4e96f4ee1eea91a6a58b4adf3541ac0750
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Nov 22 01:31:46 2020 +0800
Move ExecutorJDBCManager impl to ShardingSphereConnection (#8277)
* Rename ExecutorJDBCManager
* Refactor ShardingSphereDatabaseMetaData
* Move ExecutorJDBCManager impl to ShardingSphereConnection
---
.../PreparedStatementExecutionGroupEngine.java | 8 +-
.../driver/jdbc/StatementExecutionGroupEngine.java | 8 +-
.../PreparedStatementExecutionGroupEngineTest.java | 6 +-
.../jdbc/StatementExecutionGroupEngineTest.java | 6 +-
.../jdbc/adapter/AbstractConnectionAdapter.java | 141 +--------------------
.../core/connection/ShardingSphereConnection.java | 128 +++++++++++++++++--
.../metadata/ShardingSphereDatabaseMetaData.java | 10 +-
7 files changed, 140 insertions(+), 167 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java
index 751a741..6c82df0 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngine.java
@@ -35,8 +35,8 @@ import java.util.List;
public final class PreparedStatementExecutionGroupEngine extends DriverExecutionGroupEngine<JDBCExecutionUnit, ExecutorJDBCManager, Connection, StatementOption> {
public PreparedStatementExecutionGroupEngine(final int maxConnectionsSizePerQuery,
- final ExecutorJDBCManager executionConnection, final StatementOption option, final Collection<ShardingSphereRule> rules) {
- super(maxConnectionsSizePerQuery, executionConnection, option, rules);
+ final ExecutorJDBCManager executorJDBCManager, final StatementOption option, final Collection<ShardingSphereRule> rules) {
+ super(maxConnectionsSizePerQuery, executorJDBCManager, option, rules);
}
@Override
@@ -47,8 +47,8 @@ public final class PreparedStatementExecutionGroupEngine extends DriverExecution
return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);
}
- private PreparedStatement createPreparedStatement(final String sql, final List<Object> parameters, final ExecutorJDBCManager executionConnection, final Connection connection,
+ private PreparedStatement createPreparedStatement(final String sql, final List<Object> parameters, final ExecutorJDBCManager executorJDBCManager, final Connection connection,
final ConnectionMode connectionMode, final StatementOption statementOption) throws SQLException {
- return (PreparedStatement) executionConnection.createStorageResource(sql, parameters, connection, connectionMode, statementOption);
+ return (PreparedStatement) executorJDBCManager.createStorageResource(sql, parameters, connection, connectionMode, statementOption);
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java
index f2a20a7..02353c6 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngine.java
@@ -34,8 +34,8 @@ import java.util.Collection;
public final class StatementExecutionGroupEngine extends DriverExecutionGroupEngine<JDBCExecutionUnit, ExecutorJDBCManager, Connection, StatementOption> {
public StatementExecutionGroupEngine(final int maxConnectionsSizePerQuery,
- final ExecutorJDBCManager executionConnection, final StatementOption option, final Collection<ShardingSphereRule> rules) {
- super(maxConnectionsSizePerQuery, executionConnection, option, rules);
+ final ExecutorJDBCManager executorJDBCManager, final StatementOption option, final Collection<ShardingSphereRule> rules) {
+ super(maxConnectionsSizePerQuery, executorJDBCManager, option, rules);
}
@Override
@@ -44,8 +44,8 @@ public final class StatementExecutionGroupEngine extends DriverExecutionGroupEng
return new JDBCExecutionUnit(executionUnit, connectionMode, createStatement(executorManager, connection, connectionMode, option));
}
- private Statement createStatement(final ExecutorJDBCManager executionConnection, final Connection connection,
+ private Statement createStatement(final ExecutorJDBCManager executorJDBCManager, final Connection connection,
final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
- return executionConnection.createStorageResource(connection, connectionMode, option);
+ return executorJDBCManager.createStorageResource(connection, connectionMode, option);
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java
index 1219479..909bd66 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/PreparedStatementExecutionGroupEngineTest.java
@@ -50,7 +50,7 @@ public final class PreparedStatementExecutionGroupEngineTest {
@Test
public void assertGetExecutionUnitGroupForOneShardMemoryStrictly() throws SQLException {
groupEngine = new PreparedStatementExecutionGroupEngine(
- 2, mockExecutionConnection(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+ 2, mockExecutorJDBCManager(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(1, 1));
assertThat(actual.size(), is(1));
for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -61,7 +61,7 @@ public final class PreparedStatementExecutionGroupEngineTest {
@Test
public void assertGetExecutionUnitGroupForMultiShardConnectionStrictly() throws SQLException {
groupEngine = new PreparedStatementExecutionGroupEngine(
- 1, mockExecutionConnection(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+ 1, mockExecutorJDBCManager(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(10, 2));
assertThat(actual.size(), is(10));
for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -69,7 +69,7 @@ public final class PreparedStatementExecutionGroupEngineTest {
}
}
- private ExecutorJDBCManager mockExecutionConnection(final int size, final ConnectionMode connectionMode) throws SQLException {
+ private ExecutorJDBCManager mockExecutorJDBCManager(final int size, final ConnectionMode connectionMode) throws SQLException {
List<Connection> connections = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
connections.add(mock(Connection.class));
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java
index 68df290..4b65df6 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/group/driver/jdbc/StatementExecutionGroupEngineTest.java
@@ -50,7 +50,7 @@ public final class StatementExecutionGroupEngineTest {
@Test
public void assertGetExecutionUnitGroupForOneShardMemoryStrictly() throws SQLException {
groupEngine = new StatementExecutionGroupEngine(
- 2, mockExecutionConnection(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+ 2, mockExecutorJDBCManager(1, ConnectionMode.MEMORY_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(1, 1));
assertThat(actual.size(), is(1));
for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -61,7 +61,7 @@ public final class StatementExecutionGroupEngineTest {
@Test
public void assertGetExecutionUnitGroupForMultiShardConnectionStrictly() throws SQLException {
groupEngine = new StatementExecutionGroupEngine(
- 1, mockExecutionConnection(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
+ 1, mockExecutorJDBCManager(1, ConnectionMode.CONNECTION_STRICTLY), new StatementOption(true), Collections.singletonList(mock(ShardingSphereRule.class)));
Collection<ExecutionGroup<JDBCExecutionUnit>> actual = groupEngine.group(mock(RouteContext.class), mockShardRouteUnit(10, 2));
assertThat(actual.size(), is(10));
for (ExecutionGroup<JDBCExecutionUnit> each : actual) {
@@ -69,7 +69,7 @@ public final class StatementExecutionGroupEngineTest {
}
}
- private ExecutorJDBCManager mockExecutionConnection(final int size, final ConnectionMode connectionMode) throws SQLException {
+ private ExecutorJDBCManager mockExecutorJDBCManager(final int size, final ConnectionMode connectionMode) throws SQLException {
List<Connection> connections = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
connections.add(mock(Connection.class));
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java
index 50c1215..066ce3a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractConnectionAdapter.java
@@ -17,44 +17,25 @@
package org.apache.shardingsphere.driver.jdbc.adapter;
-import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationConnection;
-import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
-import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
-import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.hook.RootInvokeHook;
import org.apache.shardingsphere.infra.hook.SPIRootInvokeHook;
import org.apache.shardingsphere.replicaquery.route.engine.impl.PrimaryVisitedManager;
-import javax.sql.DataSource;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
/**
* Adapter for {@code Connection}.
*/
-public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection implements ExecutorJDBCManager {
-
- @Getter
- private final Map<String, DataSource> dataSourceMap;
-
- @Getter
- private final MetaDataContexts metaDataContexts;
+public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
@Getter
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
@@ -62,137 +43,21 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
@Getter
private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
+ @Getter
private final ForceExecuteTemplate<Entry<String, Connection>> forceExecuteTemplateForClose = new ForceExecuteTemplate<>();
private final RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
- private boolean autoCommit = true;
-
private boolean readOnly;
private volatile boolean closed;
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
- protected AbstractConnectionAdapter(final Map<String, DataSource> dataSourceMap, final MetaDataContexts metaDataContexts) {
- this.dataSourceMap = dataSourceMap;
- this.metaDataContexts = metaDataContexts;
+ protected AbstractConnectionAdapter() {
rootInvokeHook.start();
}
- /**
- * Get database connection.
- *
- * @param dataSourceName data source name
- * @return database connection
- * @throws SQLException SQL exception
- */
- public final Connection getConnection(final String dataSourceName) throws SQLException {
- return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
- }
-
- @Override
- public final List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
- DataSource dataSource = dataSourceMap.get(dataSourceName);
- Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
- Collection<Connection> connections;
- synchronized (cachedConnections) {
- connections = cachedConnections.get(dataSourceName);
- }
- List<Connection> result;
- if (connections.size() >= connectionSize) {
- result = new ArrayList<>(connections).subList(0, connectionSize);
- } else if (!connections.isEmpty()) {
- result = new ArrayList<>(connectionSize);
- result.addAll(connections);
- List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);
- result.addAll(newConnections);
- synchronized (cachedConnections) {
- cachedConnections.putAll(dataSourceName, newConnections);
- }
- } else {
- result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));
- synchronized (cachedConnections) {
- cachedConnections.putAll(dataSourceName, result);
- }
- }
- return result;
- }
-
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
- if (1 == connectionSize) {
- Connection connection = createConnection(dataSourceName, dataSource);
- replayMethodsInvocation(connection);
- return Collections.singletonList(connection);
- }
- if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
- return createConnections(dataSourceName, dataSource, connectionSize);
- }
- synchronized (dataSource) {
- return createConnections(dataSourceName, dataSource, connectionSize);
- }
- }
-
- private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
- List<Connection> result = new ArrayList<>(connectionSize);
- for (int i = 0; i < connectionSize; i++) {
- try {
- Connection connection = createConnection(dataSourceName, dataSource);
- replayMethodsInvocation(connection);
- result.add(connection);
- } catch (final SQLException ex) {
- for (Connection each : result) {
- each.close();
- }
- throw new SQLException(String.format("Can not get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
- }
- }
- return result;
- }
-
- protected abstract Connection createConnection(String dataSourceName, DataSource dataSource) throws SQLException;
-
- @SuppressWarnings("MagicConstant")
- @Override
- public final Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
- return connection.createStatement(option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
- }
-
- @SuppressWarnings("MagicConstant")
- @Override
- public final PreparedStatement createStorageResource(final String sql, final List<Object> parameters,
- final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
- return option.isReturnGeneratedKeys() ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
- : connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
- }
-
- @Override
- public final boolean getAutoCommit() {
- return autoCommit;
- }
-
- @Override
- public void setAutoCommit(final boolean autoCommit) throws SQLException {
- this.autoCommit = autoCommit;
- setAutoCommitForLocalTransaction(autoCommit);
- }
-
- private void setAutoCommitForLocalTransaction(final boolean autoCommit) throws SQLException {
- recordMethodInvocation(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
- forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));
- }
-
- @Override
- public void commit() throws SQLException {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
- }
-
- @Override
- public void rollback() throws SQLException {
- forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
- }
-
@Override
public final void close() throws SQLException {
closed = true;
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 5918192..54cc34f 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -17,12 +17,17 @@
package org.apache.shardingsphere.driver.jdbc.core.connection;
+import com.google.common.base.Preconditions;
+import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
+import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.ExecutorJDBCManager;
+import org.apache.shardingsphere.infra.executor.sql.group.driver.jdbc.StatementOption;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager;
@@ -33,36 +38,109 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
/**
* ShardingSphere Connection.
*/
@Getter
-public final class ShardingSphereConnection extends AbstractConnectionAdapter {
+public final class ShardingSphereConnection extends AbstractConnectionAdapter implements ExecutorJDBCManager {
+
+ private final Map<String, DataSource> dataSourceMap;
+
+ private final MetaDataContexts metaDataContexts;
private final TransactionType transactionType;
private final ShardingTransactionManager shardingTransactionManager;
+ @Getter(AccessLevel.NONE)
+ private boolean autoCommit = true;
+
public ShardingSphereConnection(final Map<String, DataSource> dataSourceMap,
final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts, final TransactionType transactionType) {
- super(dataSourceMap, metaDataContexts);
+ this.dataSourceMap = dataSourceMap;
+ this.metaDataContexts = metaDataContexts;
this.transactionType = transactionType;
shardingTransactionManager = transactionContexts.getDefaultTransactionManagerEngine().getTransactionManager(transactionType);
}
/**
- * Whether hold transaction or not.
+ * Get database connection.
*
- * @return true or false
+ * @param dataSourceName data source name
+ * @return database connection
+ * @throws SQLException SQL exception
*/
- public boolean isHoldTransaction() {
- return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
+ public Connection getConnection(final String dataSourceName) throws SQLException {
+ return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0);
}
@Override
- protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
+ public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
+ DataSource dataSource = dataSourceMap.get(dataSourceName);
+ Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
+ Collection<Connection> connections;
+ synchronized (getCachedConnections()) {
+ connections = getCachedConnections().get(dataSourceName);
+ }
+ List<Connection> result;
+ if (connections.size() >= connectionSize) {
+ result = new ArrayList<>(connections).subList(0, connectionSize);
+ } else if (!connections.isEmpty()) {
+ result = new ArrayList<>(connectionSize);
+ result.addAll(connections);
+ List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);
+ result.addAll(newConnections);
+ synchronized (getCachedConnections()) {
+ getCachedConnections().putAll(dataSourceName, newConnections);
+ }
+ } else {
+ result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));
+ synchronized (getCachedConnections()) {
+ getCachedConnections().putAll(dataSourceName, result);
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
+ if (1 == connectionSize) {
+ Connection connection = createConnection(dataSourceName, dataSource);
+ replayMethodsInvocation(connection);
+ return Collections.singletonList(connection);
+ }
+ if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
+ return createConnections(dataSourceName, dataSource, connectionSize);
+ }
+ synchronized (dataSource) {
+ return createConnections(dataSourceName, dataSource, connectionSize);
+ }
+ }
+
+ private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
+ List<Connection> result = new ArrayList<>(connectionSize);
+ for (int i = 0; i < connectionSize; i++) {
+ try {
+ Connection connection = createConnection(dataSourceName, dataSource);
+ replayMethodsInvocation(connection);
+ result.add(connection);
+ } catch (final SQLException ex) {
+ for (Connection each : result) {
+ each.close();
+ }
+ throw new SQLException(String.format("Can not get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
+ }
+ }
+ return result;
+ }
+
+ private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
}
@@ -70,6 +148,29 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
return null != shardingTransactionManager && shardingTransactionManager.isInTransaction();
}
+ /**
+ * Whether hold transaction or not.
+ *
+ * @return true or false
+ */
+ public boolean isHoldTransaction() {
+ return (TransactionType.LOCAL == transactionType && !autoCommit) || (TransactionType.XA == transactionType && isInShardingTransaction());
+ }
+
+ @SuppressWarnings("MagicConstant")
+ @Override
+ public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
+ return connection.createStatement(option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
+ }
+
+ @SuppressWarnings("MagicConstant")
+ @Override
+ public PreparedStatement createStorageResource(final String sql, final List<Object> parameters,
+ final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
+ return option.isReturnGeneratedKeys() ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
+ : connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
+ }
+
@Override
public DatabaseMetaData getMetaData() {
return new ShardingSphereDatabaseMetaData(this);
@@ -121,9 +222,16 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
}
@Override
+ public boolean getAutoCommit() {
+ return autoCommit;
+ }
+
+ @Override
public void setAutoCommit(final boolean autoCommit) throws SQLException {
if (TransactionType.LOCAL == transactionType) {
- super.setAutoCommit(autoCommit);
+ this.autoCommit = autoCommit;
+ recordMethodInvocation(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
+ getForceExecuteTemplate().execute(getCachedConnections().values(), connection -> connection.setAutoCommit(autoCommit));
return;
}
if (autoCommit != shardingTransactionManager.isInTransaction()) {
@@ -147,7 +255,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
@Override
public void commit() throws SQLException {
if (TransactionType.LOCAL == transactionType) {
- super.commit();
+ getForceExecuteTemplate().execute(getCachedConnections().values(), Connection::commit);
} else {
shardingTransactionManager.commit();
}
@@ -156,7 +264,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
@Override
public void rollback() throws SQLException {
if (TransactionType.LOCAL == transactionType) {
- super.rollback();
+ getForceExecuteTemplate().execute(getCachedConnections().values(), Connection::rollback);
} else {
shardingTransactionManager.rollback();
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
index 20bb6a1..cf16598 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/metadata/ShardingSphereDatabaseMetaData.java
@@ -18,13 +18,13 @@
package org.apache.shardingsphere.driver.jdbc.core.datasource.metadata;
import lombok.Getter;
-import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
import org.apache.shardingsphere.driver.jdbc.adapter.AdaptedDatabaseMetaData;
+import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.resultset.DatabaseMetaDataResultSet;
import org.apache.shardingsphere.infra.database.DefaultSchema;
-import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.metadata.resource.DataSourcesMetaData;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
import java.security.SecureRandom;
import java.sql.Connection;
@@ -42,7 +42,7 @@ import java.util.Random;
@Getter
public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaData {
- private final AbstractConnectionAdapter connection;
+ private final ShardingSphereConnection connection;
private final Collection<ShardingSphereRule> rules;
@@ -56,7 +56,7 @@ public final class ShardingSphereDatabaseMetaData extends AdaptedDatabaseMetaDat
private DatabaseMetaData currentDatabaseMetaData;
- public ShardingSphereDatabaseMetaData(final AbstractConnectionAdapter connection) {
+ public ShardingSphereDatabaseMetaData(final ShardingSphereConnection connection) {
super(connection.getMetaDataContexts().getDefaultMetaData().getResource().getCachedDatabaseMetaData());
this.connection = connection;
rules = connection.getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules();