You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/06/23 10:27:30 UTC
[shardingsphere] branch master updated: Move cached statements and
result sets into DatabaseCommunicationEngine (#10932)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ba64144 Move cached statements and result sets into DatabaseCommunicationEngine (#10932)
ba64144 is described below
commit ba6414447a80ac5a98a2ac30464b1964b6d7c5b4
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Wed Jun 23 18:27:05 2021 +0800
Move cached statements and result sets into DatabaseCommunicationEngine (#10932)
* Move cached statements and result sets into DatabaseCommunicationEngine
* Complete tests for DatabaseCommunicationEngine
* Close TextProtocolBackendHandler correctly
* Add DatabaseCommunicationEngine to BackendConnection
---
.../communication/DatabaseCommunicationEngine.java | 75 +++++++++++++++-
.../DatabaseCommunicationEngineFactory.java | 8 +-
.../backend/communication/ProxySQLExecutor.java | 9 +-
.../jdbc/connection/BackendConnection.java | 50 +++--------
.../jdbc/executor/ProxyJDBCExecutor.java | 7 +-
.../callback/ProxyJDBCExecutorCallback.java | 12 +--
.../callback/ProxyJDBCExecutorCallbackFactory.java | 15 ++--
.../ProxyPreparedStatementExecutorCallback.java | 6 +-
.../impl/ProxyStatementExecutorCallback.java | 6 +-
.../backend/text/TextProtocolBackendHandler.java | 8 ++
.../impl/SchemaAssignedDatabaseBackendHandler.java | 5 ++
.../data/impl/UnicastDatabaseBackendHandler.java | 5 ++
.../DatabaseCommunicationEngineTest.java | 99 ++++++++++++++++++++--
.../jdbc/connection/BackendConnectionTest.java | 73 ----------------
.../frontend/command/CommandExecutorTask.java | 20 +++--
.../netty/FrontendChannelInboundHandler.java | 3 +-
.../frontend/command/CommandExecutorTaskTest.java | 11 +--
.../execute/MySQLComStmtExecuteExecutor.java | 7 +-
.../fieldlist/MySQLComFieldListPacketExecutor.java | 7 +-
.../text/query/MySQLComQueryPacketExecutor.java | 5 ++
.../query/text/PostgreSQLComQueryExecutor.java | 5 ++
.../frontend/command/executor/CommandExecutor.java | 8 ++
22 files changed, 274 insertions(+), 170 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 2f1a058..b770f31 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -44,11 +44,15 @@ import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryH
import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
/**
@@ -73,18 +77,40 @@ public final class DatabaseCommunicationEngine {
private ProxyLockEngine proxyLockEngine;
+ private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();
+
+ private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();
+
public DatabaseCommunicationEngine(final String driverType, final ShardingSphereMetaData metaData, final LogicSQL logicSQL, final BackendConnection backendConnection) {
this.driverType = driverType;
this.metaData = metaData;
this.logicSQL = logicSQL;
- proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection);
+ proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection, this);
kernelProcessor = new KernelProcessor();
- proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new MetadataRefreshEngine(metaData,
- ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
+ proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new MetadataRefreshEngine(metaData,
+ ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
ProxyContext.getInstance().getMetaDataContexts().getProps(), ProxyContext.getInstance().getLock().orElse(null)), backendConnection.getSchemaName());
}
/**
+ * Add statement.
+ *
+ * @param statement statement to be added
+ */
+ public void add(final Statement statement) {
+ cachedStatements.add(statement);
+ }
+
+ /**
+ * Add result set.
+ *
+ * @param resultSet result set to be added
+ */
+ public void add(final ResultSet resultSet) {
+ cachedResultSets.add(resultSet);
+ }
+
+ /**
* Execute to database.
*
* @return backend response
@@ -192,4 +218,47 @@ public final class DatabaseCommunicationEngine {
private boolean isBinary() {
return JDBCDriverType.PREPARED_STATEMENT.equals(driverType);
}
+
+ /**
+ * Close database communication engine.
+ *
+ * @throws SQLException SQL exception
+ */
+ public void close() throws SQLException {
+ Collection<SQLException> result = new LinkedList<>();
+ result.addAll(closeResultSets());
+ result.addAll(closeStatements());
+ if (result.isEmpty()) {
+ return;
+ }
+ SQLException ex = new SQLException();
+ result.forEach(ex::setNextException);
+ throw ex;
+ }
+
+ private Collection<SQLException> closeResultSets() {
+ Collection<SQLException> result = new LinkedList<>();
+ for (ResultSet each : cachedResultSets) {
+ try {
+ each.close();
+ } catch (final SQLException ex) {
+ result.add(ex);
+ }
+ }
+ cachedResultSets.clear();
+ return result;
+ }
+
+ private Collection<SQLException> closeStatements() {
+ Collection<SQLException> result = new LinkedList<>();
+ for (Statement each : cachedStatements) {
+ try {
+ each.close();
+ } catch (final SQLException ex) {
+ result.add(ex);
+ }
+ }
+ cachedStatements.clear();
+ return result;
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
index e010424..459cc06 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
@@ -60,7 +60,9 @@ public final class DatabaseCommunicationEngineFactory {
public DatabaseCommunicationEngine newTextProtocolInstance(final SQLStatement sqlStatement, final String sql, final BackendConnection backendConnection) {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, Collections.emptyList(), metaData);
- return new DatabaseCommunicationEngine(JDBCDriverType.STATEMENT, metaData, logicSQL, backendConnection);
+ DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.STATEMENT, metaData, logicSQL, backendConnection);
+ backendConnection.add(result);
+ return result;
}
/**
@@ -75,7 +77,9 @@ public final class DatabaseCommunicationEngineFactory {
public DatabaseCommunicationEngine newBinaryProtocolInstance(final SQLStatement sqlStatement, final String sql, final List<Object> parameters, final BackendConnection backendConnection) {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, new ArrayList<>(parameters), metaData);
- return new DatabaseCommunicationEngine(JDBCDriverType.PREPARED_STATEMENT, metaData, logicSQL, backendConnection);
+ DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.PREPARED_STATEMENT, metaData, logicSQL, backendConnection);
+ backendConnection.add(result);
+ return result;
}
private LogicSQL createLogicSQL(final SQLStatement sqlStatement, final String sql, final List<Object> parameters, final ShardingSphereMetaData metaData) {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index fba5049..0df7432 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -67,18 +67,21 @@ public final class ProxySQLExecutor {
private final BackendConnection backendConnection;
+ private final DatabaseCommunicationEngine databaseCommunicationEngine;
+
private final ProxyJDBCExecutor jdbcExecutor;
private final RawExecutor rawExecutor;
private final FederateExecutor federateExecutor;
- public ProxySQLExecutor(final String type, final BackendConnection backendConnection) {
+ public ProxySQLExecutor(final String type, final BackendConnection backendConnection, final DatabaseCommunicationEngine databaseCommunicationEngine) {
this.type = type;
this.backendConnection = backendConnection;
+ this.databaseCommunicationEngine = databaseCommunicationEngine;
ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
boolean isSerialExecute = backendConnection.isSerialExecute();
- jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new JDBCExecutor(executorEngine, isSerialExecute));
+ jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
MetaDataContexts metaDataContexts = ProxyContext.getInstance().getMetaDataContexts();
rawExecutor = new RawExecutor(executorEngine, isSerialExecute, metaDataContexts.getProps());
// TODO Consider FederateRawExecutor
@@ -145,7 +148,7 @@ public final class ProxySQLExecutor {
}
MetaDataContexts metaData = ProxyContext.getInstance().getMetaDataContexts();
ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(type, metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(),
- executionContext.getSqlStatementContext().getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
+ executionContext.getSqlStatementContext().getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true);
backendConnection.setFederateExecutor(federateExecutor);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaData);
return federateExecutor.executeQuery(executionContext, callback, prepareEngine).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 1bcffb8..0beea56 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -39,7 +40,6 @@ import org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
@@ -74,9 +74,7 @@ public final class BackendConnection implements ExecutorJDBCManager {
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
- private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();
-
- private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();
+ private final Collection<DatabaseCommunicationEngine> cachedDatabaseCommunicationEngines = new CopyOnWriteArrayList<>();
private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
@@ -213,56 +211,28 @@ public final class BackendConnection implements ExecutorJDBCManager {
}
/**
- * Add statement.
- *
- * @param statement statement to be added
- */
- public void add(final Statement statement) {
- cachedStatements.add(statement);
- }
-
- /**
- * Add result set.
+ * Add database communication engine.
*
- * @param resultSet result set to be added
+ * @param databaseCommunicationEngine database communication engine to be added
*/
- public void add(final ResultSet resultSet) {
- cachedResultSets.add(resultSet);
- }
-
- /**
- * Close result sets.
- *
- * @return SQL exception when result sets close
- */
- public synchronized Collection<SQLException> closeResultSets() {
- Collection<SQLException> result = new LinkedList<>();
- for (ResultSet each : cachedResultSets) {
- try {
- each.close();
- } catch (final SQLException ex) {
- result.add(ex);
- }
- }
- cachedResultSets.clear();
- return result;
+ public void add(final DatabaseCommunicationEngine databaseCommunicationEngine) {
+ cachedDatabaseCommunicationEngines.add(databaseCommunicationEngine);
}
/**
- * Close statements.
+ * Close database communication engines.
*
- * @return SQL exception when statements close
+ * @return SQL exception when engine close
*/
- public synchronized Collection<SQLException> closeStatements() {
+ public synchronized Collection<SQLException> closeDatabaseCommunicationEngines() {
Collection<SQLException> result = new LinkedList<>();
- for (Statement each : cachedStatements) {
+ for (DatabaseCommunicationEngine each : cachedDatabaseCommunicationEngines) {
try {
each.close();
} catch (final SQLException ex) {
result.add(ex);
}
}
- cachedStatements.clear();
return result;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 4087d10..d6e12c6 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -44,6 +45,8 @@ public final class ProxyJDBCExecutor {
private final BackendConnection backendConnection;
+ private final DatabaseCommunicationEngine databaseCommunicationEngine;
+
@Getter
private final JDBCExecutor jdbcExecutor;
@@ -64,8 +67,8 @@ public final class ProxyJDBCExecutor {
DatabaseType databaseType = metaDataContexts.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType();
ExecuteProcessEngine.initialize(context, executionGroupContext, metaDataContexts.getProps());
Collection<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
- ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, true),
- ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, false));
+ ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true),
+ ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, false));
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
return result;
} finally {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
index 2d05d4f..3285833 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -43,7 +43,7 @@ import java.util.Optional;
*/
public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<ExecuteResult> {
- private final BackendConnection backendConnection;
+ private final DatabaseCommunicationEngine databaseCommunicationEngine;
private final boolean isReturnGeneratedKeys;
@@ -51,10 +51,10 @@ public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<Exe
private boolean hasMetaData;
- public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
+ public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final DatabaseCommunicationEngine databaseCommunicationEngine,
final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
super(databaseType, sqlStatement, isExceptionThrown);
- this.backendConnection = backendConnection;
+ this.databaseCommunicationEngine = databaseCommunicationEngine;
this.isReturnGeneratedKeys = isReturnGeneratedKeys;
this.fetchMetaData = fetchMetaData;
}
@@ -69,10 +69,10 @@ public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<Exe
}
private ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final boolean withMetadata) throws SQLException {
- backendConnection.add(statement);
+ databaseCommunicationEngine.add(statement);
if (execute(sql, statement, isReturnGeneratedKeys)) {
ResultSet resultSet = statement.getResultSet();
- backendConnection.add(resultSet);
+ databaseCommunicationEngine.add(resultSet);
return createQueryResult(resultSet, connectionMode);
}
return new UpdateResult(statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0L);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
index 3967393..f7edf40 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyPreparedStatementExecutorCallback;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyStatementExecutorCallback;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -34,23 +34,24 @@ public final class ProxyJDBCExecutorCallbackFactory {
/**
* Create new instance of Proxy JDBC executor callback.
- *
+ *
* @param type driver type
* @param databaseType database type
* @param sqlStatement SQL statement
- * @param backendConnection backend connection
+ * @param databaseCommunicationEngine database communication engine
* @param isReturnGeneratedKeys is return generated keys or not
* @param isExceptionThrown is exception thrown or not
* @param isFetchMetaData is fetch meta data or not
* @return instance of Proxy JDBC executor callback
*/
- public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
- final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean isFetchMetaData) {
+ public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType databaseType, final SQLStatement sqlStatement,
+ final DatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown,
+ final boolean isFetchMetaData) {
if (JDBCDriverType.STATEMENT.equals(type)) {
- return new ProxyStatementExecutorCallback(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+ return new ProxyStatementExecutorCallback(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
}
if (JDBCDriverType.PREPARED_STATEMENT.equals(type)) {
- return new ProxyPreparedStatementExecutorCallback(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+ return new ProxyPreparedStatementExecutorCallback(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
}
throw new UnsupportedOperationException(String.format("Unsupported driver type: `%s`", type));
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
index 67bcf1c..f8b4a60 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -31,9 +31,9 @@ import java.sql.Statement;
*/
public final class ProxyPreparedStatementExecutorCallback extends ProxyJDBCExecutorCallback {
- public ProxyPreparedStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
+ public ProxyPreparedStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final DatabaseCommunicationEngine databaseCommunicationEngine,
final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
- super(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+ super(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
}
@Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
index 88647fc..17c2044 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -30,9 +30,9 @@ import java.sql.Statement;
*/
public final class ProxyStatementExecutorCallback extends ProxyJDBCExecutorCallback {
- public ProxyStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
+ public ProxyStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final DatabaseCommunicationEngine databaseCommunicationEngine,
final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
- super(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+ super(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
}
@Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
index f87cb1d..5c4e12f 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
@@ -55,4 +55,12 @@ public interface TextProtocolBackendHandler {
default Collection<Object> getRowData() throws SQLException {
return Collections.emptyList();
}
+
+ /**
+ * Close handler.
+ *
+ * @throws SQLException SQL exception
+ */
+ default void close() throws SQLException {
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index f2633a8..8d8ad45 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -64,4 +64,9 @@ public final class SchemaAssignedDatabaseBackendHandler implements DatabaseBacke
public Collection<Object> getRowData() throws SQLException {
return databaseCommunicationEngine.getQueryResponseRow().getData();
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
index 57ec70f..3871ad1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
@@ -74,4 +74,9 @@ public final class UnicastDatabaseBackendHandler implements DatabaseBackendHandl
public Collection<Object> getRowData() throws SQLException {
return databaseCommunicationEngine.getQueryResponseRow().getData();
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
index 5677758..e2768a4 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
@@ -17,11 +17,11 @@
package org.apache.shardingsphere.proxy.backend.communication;
+import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataContexts;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
@@ -44,11 +44,17 @@ import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLSta
import org.apache.shardingsphere.transaction.context.impl.StandardTransactionContexts;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.internal.util.reflection.FieldSetter;
+import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Types;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -56,18 +62,33 @@ import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public final class DatabaseCommunicationEngineTest {
+ @Mock
+ private BackendConnection backendConnection;
+
+ @Mock
+ private Statement statement;
+
+ @Mock
+ private ResultSet resultSet;
+
@Before
public void setUp() {
- MetaDataContexts metaDataContexts = new StandardMetaDataContexts(mockMetaDataMap(), mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class),
+ when(backendConnection.getSchemaName()).thenReturn("schema");
+ MetaDataContexts metaDataContexts = new StandardMetaDataContexts(mockMetaDataMap(), mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class),
new ConfigurationProperties(new Properties()));
ProxyContext.getInstance().init(metaDataContexts, new StandardTransactionContexts());
}
@@ -81,8 +102,6 @@ public final class DatabaseCommunicationEngineTest {
@Test
public void assertBinaryProtocolQueryHeader() throws SQLException, NoSuchFieldException {
- BackendConnection backendConnection = mock(BackendConnection.class);
- when(backendConnection.getSchemaName()).thenReturn("schema");
DatabaseCommunicationEngine engine =
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
assertNotNull(engine);
@@ -95,7 +114,7 @@ public final class DatabaseCommunicationEngineTest {
private MemoryQueryResultRow memoryQueryResultRow;
@Override
- protected List<MemoryQueryResultRow> init(final ShardingSphereRule rule, final ShardingSphereSchema schema,
+ protected List<MemoryQueryResultRow> init(final ShardingSphereRule rule, final ShardingSphereSchema schema,
final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) {
memoryQueryResultRow = mock(MemoryQueryResultRow.class);
return Collections.singletonList(memoryQueryResultRow);
@@ -117,7 +136,6 @@ public final class DatabaseCommunicationEngineTest {
ShardingSphereSchema schema = mock(ShardingSphereSchema.class);
when(schema.get("t_logic_order")).thenReturn(new TableMetaData(Collections.singletonList(columnMetaData), Collections.singletonList(new IndexMetaData("order_id"))));
DataSourcesMetaData dataSourcesMetaData = mock(DataSourcesMetaData.class);
- when(dataSourcesMetaData.getDataSourceMetaData("ds_0")).thenReturn(mock(DataSourceMetaData.class));
when(result.getResource().getDataSourcesMetaData()).thenReturn(dataSourcesMetaData);
when(result.getSchema()).thenReturn(schema);
ShardingRule shardingRule = mock(ShardingRule.class);
@@ -132,7 +150,6 @@ public final class DatabaseCommunicationEngineTest {
when(result.getTableName(1)).thenReturn("t_order");
when(result.getColumnLabel(1)).thenReturn("order_id");
when(result.getColumnName(1)).thenReturn("order_id");
- when(result.getColumnName(2)).thenReturn("expr");
when(result.getColumnType(1)).thenReturn(Types.INTEGER);
when(result.isSigned(1)).thenReturn(true);
when(result.isAutoIncrement(1)).thenReturn(true);
@@ -141,4 +158,72 @@ public final class DatabaseCommunicationEngineTest {
when(result.isNotNull(1)).thenReturn(true);
return result;
}
+
+ @Test
+ public void assertAddStatementCorrectly() {
+ DatabaseCommunicationEngine engine =
+ DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+ engine.add(statement);
+ Collection<?> actual = getField(engine, "cachedStatements");
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(statement));
+ }
+
+ @Test
+ public void assertAddResultSetCorrectly() {
+ DatabaseCommunicationEngine engine =
+ DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+ engine.add(resultSet);
+ Collection<?> actual = getField(engine, "cachedResultSets");
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(resultSet));
+ }
+
+ @Test
+ public void assertCloseCorrectly() throws SQLException {
+ DatabaseCommunicationEngine engine =
+ DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+ Collection<ResultSet> cachedResultSets = getField(engine, "cachedResultSets");
+ cachedResultSets.add(resultSet);
+ Collection<Statement> cachedStatements = getField(engine, "cachedStatements");
+ cachedStatements.add(statement);
+ engine.close();
+ verify(resultSet).close();
+ verify(statement).close();
+ assertTrue(cachedResultSets.isEmpty());
+ assertTrue(cachedStatements.isEmpty());
+ }
+
+ @Test
+ public void assertCloseResultSetsWithExceptionThrown() throws SQLException {
+ DatabaseCommunicationEngine engine =
+ DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+ Collection<ResultSet> cachedResultSets = getField(engine, "cachedResultSets");
+ SQLException sqlExceptionByResultSet = new SQLException("ResultSet");
+ doThrow(sqlExceptionByResultSet).when(resultSet).close();
+ cachedResultSets.add(resultSet);
+ Collection<Statement> cachedStatements = getField(engine, "cachedStatements");
+ SQLException sqlExceptionByStatement = new SQLException("Statement");
+ doThrow(sqlExceptionByStatement).when(statement).close();
+ cachedStatements.add(statement);
+ SQLException actual = null;
+ try {
+ engine.close();
+ } catch (final SQLException ex) {
+ actual = ex;
+ }
+ verify(resultSet).close();
+ verify(statement).close();
+ assertTrue(cachedResultSets.isEmpty());
+ assertTrue(cachedStatements.isEmpty());
+ assertThat(actual.getNextException(), is(sqlExceptionByResultSet));
+ assertThat(actual.getNextException().getNextException(), is(sqlExceptionByStatement));
+ }
+
+ @SneakyThrows
+ private <T> T getField(final DatabaseCommunicationEngine target, final String fieldName) {
+ Field field = DatabaseCommunicationEngine.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (T) field.get(target);
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
index b7498eb..b9e3ea8 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
@@ -44,7 +44,6 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
@@ -260,78 +259,6 @@ public final class BackendConnectionTest {
}
@Test
- public void assertAddStatementCorrectly() throws NoSuchFieldException, IllegalAccessException {
- Statement statement = mock(Statement.class);
- backendConnection.add(statement);
- Field field = backendConnection.getClass().getDeclaredField("cachedStatements");
- field.setAccessible(true);
- assertTrue(((Collection<?>) field.get(backendConnection)).contains(statement));
- }
-
- @Test
- public void assertAddResultSetCorrectly() throws NoSuchFieldException, IllegalAccessException {
- ResultSet resultSet = mock(ResultSet.class);
- backendConnection.add(resultSet);
- Field field = backendConnection.getClass().getDeclaredField("cachedResultSets");
- field.setAccessible(true);
- assertTrue(((Collection<?>) field.get(backendConnection)).contains(resultSet));
- }
-
- @Test
- public void assertCloseResultSetsCorrectly() throws NoSuchFieldException, SQLException, IllegalAccessException {
- Field field = backendConnection.getClass().getDeclaredField("cachedResultSets");
- field.setAccessible(true);
- Collection<ResultSet> cachedResultSets = (Collection<ResultSet>) field.get(backendConnection);
- ResultSet resultSet = mock(ResultSet.class);
- cachedResultSets.add(resultSet);
- backendConnection.closeResultSets();
- verify(resultSet, times(1)).close();
- assertTrue(cachedResultSets.isEmpty());
- }
-
- @Test
- public void assertCloseResultSetsWithExceptionThrown() throws NoSuchFieldException, SQLException, IllegalAccessException {
- Field field = backendConnection.getClass().getDeclaredField("cachedResultSets");
- field.setAccessible(true);
- Collection<ResultSet> cachedResultSets = (Collection<ResultSet>) field.get(backendConnection);
- ResultSet resultSet = mock(ResultSet.class);
- SQLException sqlException = new SQLException("");
- doThrow(sqlException).when(resultSet).close();
- cachedResultSets.add(resultSet);
- Collection<SQLException> result = backendConnection.closeResultSets();
- verify(resultSet, times(1)).close();
- assertTrue(cachedResultSets.isEmpty());
- assertTrue(result.contains(sqlException));
- }
-
- @Test
- public void assertCloseStatementsCorrectly() throws NoSuchFieldException, SQLException, IllegalAccessException {
- Field field = backendConnection.getClass().getDeclaredField("cachedStatements");
- field.setAccessible(true);
- Collection<Statement> cachedStatement = (Collection<Statement>) field.get(backendConnection);
- Statement statement = mock(Statement.class);
- cachedStatement.add(statement);
- backendConnection.closeStatements();
- verify(statement, times(1)).close();
- assertTrue(cachedStatement.isEmpty());
- }
-
- @Test
- public void assertCloseStatementsWithExceptionThrown() throws SQLException, NoSuchFieldException, IllegalAccessException {
- Field field = backendConnection.getClass().getDeclaredField("cachedStatements");
- field.setAccessible(true);
- Collection<Statement> cachedStatement = (Collection<Statement>) field.get(backendConnection);
- Statement statement = mock(Statement.class);
- cachedStatement.add(statement);
- SQLException sqlException = new SQLException("");
- doThrow(sqlException).when(statement).close();
- Collection<SQLException> result = backendConnection.closeStatements();
- verify(statement, times(1)).close();
- assertTrue(cachedStatement.isEmpty());
- assertTrue(result.contains(sqlException));
- }
-
- @Test
public void assertCloseConnectionsCorrectlyWhenNotForceRollback() throws NoSuchFieldException, IllegalAccessException, SQLException {
Field field = backendConnection.getClass().getDeclaredField("cachedConnections");
field.setAccessible(true);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index e92d22f..d55dabd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -90,13 +90,17 @@ public final class CommandExecutorTask implements Runnable {
CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
CommandPacket commandPacket = commandExecuteEngine.getCommandPacket(payload, type, backendConnection);
CommandExecutor commandExecutor = commandExecuteEngine.getCommandExecutor(type, commandPacket, backendConnection);
- Collection<DatabasePacket<?>> responsePackets = commandExecutor.execute();
- if (responsePackets.isEmpty()) {
- return false;
- }
- responsePackets.forEach(context::write);
- if (commandExecutor instanceof QueryCommandExecutor) {
- return commandExecuteEngine.writeQueryData(context, backendConnection, (QueryCommandExecutor) commandExecutor, responsePackets.size());
+ try {
+ Collection<DatabasePacket<?>> responsePackets = commandExecutor.execute();
+ if (responsePackets.isEmpty()) {
+ return false;
+ }
+ responsePackets.forEach(context::write);
+ if (commandExecutor instanceof QueryCommandExecutor) {
+ return commandExecuteEngine.writeQueryData(context, backendConnection, (QueryCommandExecutor) commandExecutor, responsePackets.size());
+ }
+ } finally {
+ commandExecutor.close();
}
return databaseProtocolFrontendEngine.getFrontendContext().isFlushForPerCommandPacket();
}
@@ -113,8 +117,6 @@ public final class CommandExecutorTask implements Runnable {
private Collection<SQLException> closeExecutionResources() {
Collection<SQLException> result = new LinkedList<>();
PrimaryVisitedManager.clear();
- result.addAll(backendConnection.closeResultSets());
- result.addAll(backendConnection.closeStatements());
result.addAll(backendConnection.closeFederateExecutor());
return result;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d4d2984..cd6c297 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -94,8 +94,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
private void closeAllResources() {
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
PrimaryVisitedManager.clear();
- backendConnection.closeResultSets();
- backendConnection.closeStatements();
+ backendConnection.closeDatabaseCommunicationEngines();
backendConnection.closeConnections(true);
backendConnection.closeFederateExecutor();
databaseProtocolFrontendEngine.release(backendConnection);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 822d715..b9422df 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -102,14 +102,13 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
- when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
- when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
verify(connectionStatus).switchToUsing();
+ verify(queryCommandExecutor).close();
}
@Test
@@ -123,8 +122,6 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
- when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
- when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
@@ -134,6 +131,7 @@ public final class CommandExecutorTaskTest {
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
verify(executeEngine).writeQueryData(handlerContext, backendConnection, queryCommandExecutor, 1);
+ verify(queryCommandExecutor).close();
}
@Test
@@ -148,8 +146,6 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
- when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
- when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
@@ -158,6 +154,7 @@ public final class CommandExecutorTaskTest {
verify(connectionStatus).switchToUsing();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
+ verify(commandExecutor).close();
}
@Test
@@ -169,8 +166,6 @@ public final class CommandExecutorTaskTest {
when(executeEngine.getErrorPacket(mockException, backendConnection)).thenReturn(databasePacket);
when(executeEngine.getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
- when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
- when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index a6569cd..afcde18 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -25,8 +25,8 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
@@ -111,4 +111,9 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
return new BinaryRow(queryResponseRow.getCells().stream().map(
each -> new BinaryCell(MySQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell) each).getJdbcType()), each.getData())).collect(Collectors.toList()));
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
index 3e2b1bb..9f7e811 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
@@ -23,12 +23,12 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fie
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
@@ -76,4 +76,9 @@ public final class MySQLComFieldListPacketExecutor implements CommandExecutor {
result.add(new MySQLEofPacket(++currentSequenceId));
return result;
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
index 9984604..9d06568 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
@@ -79,4 +79,9 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
public MySQLPacket getQueryRowPacket() throws SQLException {
return new MySQLTextResultSetRowPacket(++currentSequenceId, textProtocolBackendHandler.getRowData());
}
+
+ @Override
+ public void close() throws SQLException {
+ textProtocolBackendHandler.close();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
index 5e11134..9bfb337 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
@@ -97,4 +97,9 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
public PostgreSQLPacket getQueryRowPacket() throws SQLException {
return new PostgreSQLDataRowPacket(textProtocolBackendHandler.getRowData());
}
+
+ @Override
+ public void close() throws SQLException {
+ textProtocolBackendHandler.close();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
index c1aa8a4..cc94e31 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
@@ -34,4 +34,12 @@ public interface CommandExecutor {
* @throws SQLException SQL exception
*/
Collection<DatabasePacket<?>> execute() throws SQLException;
+
+ /**
+ * Close command executor.
+ *
+ * @throws SQLException SQL exception
+ */
+ default void close() throws SQLException {
+ }
}