You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/07/30 04:35:29 UTC
[shardingsphere] branch master updated: Move closeFederationExecutor into DatabaseCommunicationEngine (#19706)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e6afb9b7bbf Move closeFederationExecutor into DatabaseCommunicationEngine (#19706)
e6afb9b7bbf is described below
commit e6afb9b7bbf9e236974e05aa57256ccc885c23a6
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Sat Jul 30 12:35:19 2022 +0800
Move closeFederationExecutor into DatabaseCommunicationEngine (#19706)
* Close federation executor in JDBCDatabaseCommunicationEngine
* Complete JDBCDatabaseCommunicationEngineTest
* Remove unused Setter in JDBCBackendConnection
---
.../jdbc/JDBCDatabaseCommunicationEngine.java | 36 ++++++++++----
.../jdbc/connection/JDBCBackendConnection.java | 26 +---------
.../jdbc/JDBCDatabaseCommunicationEngineTest.java | 56 +++++++++++++++++++++-
.../jdbc/connection/JDBCBackendConnectionTest.java | 2 -
4 files changed, 81 insertions(+), 39 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index a5ccb845589..2d34eb19b18 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -62,6 +62,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -75,21 +76,14 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();
- private final FederationExecutor federationExecutor;
-
private final JDBCBackendConnection backendConnection;
+ private volatile FederationExecutor federationExecutor;
+
public JDBCDatabaseCommunicationEngine(final String driverType, final ShardingSphereDatabase database, final LogicSQL logicSQL, final JDBCBackendConnection backendConnection) {
super(driverType, database, logicSQL, backendConnection);
proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection, this);
this.backendConnection = backendConnection;
- MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
- String databaseName = backendConnection.getConnectionSession().getDatabaseName();
- DatabaseType databaseType = logicSQL.getSqlStatementContext().getDatabaseType();
- String schemaName = logicSQL.getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(databaseType, databaseName));
- federationExecutor = FederationExecutorFactory.newInstance(databaseName, schemaName, metaDataContexts.getOptimizerContext(), metaDataContexts.getMetaData().getGlobalRuleMetaData(),
- metaDataContexts.getMetaData().getProps(), new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), backendConnection.isSerialExecute()),
- ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
}
/**
@@ -127,6 +121,7 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName());
if (executionContext.getRouteContext().isFederated() || (sqlStatementContext instanceof SelectStatementContext
&& SystemSchemaUtil.containsSystemSchema(sqlStatementContext.getDatabaseType(), sqlStatementContext.getTablesContext().getSchemaNames(), database))) {
+ prepareFederationExecutor();
ResultSet resultSet = doExecuteFederation(logicSQL, metaDataContexts);
return processExecuteFederation(resultSet, metaDataContexts);
}
@@ -143,6 +138,16 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
: processExecuteUpdate(executionContext, result);
}
+ private void prepareFederationExecutor() {
+ MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+ String databaseName = backendConnection.getConnectionSession().getDatabaseName();
+ DatabaseType databaseType = getLogicSQL().getSqlStatementContext().getDatabaseType();
+ String schemaName = getLogicSQL().getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(databaseType, databaseName));
+ federationExecutor = FederationExecutorFactory.newInstance(databaseName, schemaName, metaDataContexts.getOptimizerContext(), metaDataContexts.getMetaData().getGlobalRuleMetaData(),
+ metaDataContexts.getMetaData().getProps(), new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), backendConnection.isSerialExecute()),
+ ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+ }
+
private ResultSet doExecuteFederation(final LogicSQL logicSQL, final MetaDataContexts metaDataContexts) throws SQLException {
boolean isReturnGeneratedKeys = logicSQL.getSqlStatementContext().getSqlStatement() instanceof MySQLInsertStatement;
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName());
@@ -150,7 +155,6 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
DatabaseType databaseType = database.getResource().getDatabaseType();
ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(getDriverType(), protocolType, databaseType,
logicSQL.getSqlStatementContext().getSqlStatement(), this, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
- backendConnection.setFederationExecutor(federationExecutor);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaDataContexts);
FederationContext context = new FederationContext(false, logicSQL, metaDataContexts.getMetaData().getDatabases());
return federationExecutor.executeQuery(prepareEngine, callback, context);
@@ -185,6 +189,7 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
Collection<SQLException> result = new LinkedList<>();
result.addAll(closeResultSets());
result.addAll(closeStatements());
+ closeFederationExecutor().ifPresent(result::add);
if (result.isEmpty()) {
return;
}
@@ -219,4 +224,15 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
cachedStatements.clear();
return result;
}
+
+ private Optional<SQLException> closeFederationExecutor() {
+ if (null != federationExecutor) {
+ try {
+ federationExecutor.close();
+ } catch (final SQLException ex) {
+ return Optional.of(ex);
+ }
+ }
+ return Optional.empty();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
index ebb73dc4ee1..1d24788bd59 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
@@ -22,16 +22,14 @@ import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.Setter;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCConnectionManager;
-import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.util.TransactionUtil;
import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -50,13 +48,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
@RequiredArgsConstructor
@Getter
-@Setter
public final class JDBCBackendConnection implements BackendConnection<Void>, ExecutorJDBCConnectionManager {
private final ConnectionSession connectionSession;
- private volatile FederationExecutor federationExecutor;
-
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
private final Collection<ProxyBackendHandler> backendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
@@ -192,7 +187,6 @@ public final class JDBCBackendConnection implements BackendConnection<Void>, Exe
synchronized (this) {
Collection<Exception> result = new LinkedList<>();
result.addAll(closeHandlers(false));
- result.addAll(closeFederationExecutor());
if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction()) {
result.addAll(closeHandlers(true));
result.addAll(closeConnections(false));
@@ -213,7 +207,6 @@ public final class JDBCBackendConnection implements BackendConnection<Void>, Exe
closed.set(true);
closeHandlers(true);
closeConnections(true);
- closeFederationExecutor();
return null;
}
}
@@ -269,21 +262,4 @@ public final class JDBCBackendConnection implements BackendConnection<Void>, Exe
}
return result;
}
-
- /**
- * Close federation executor.
- *
- * @return SQL exception when federation executor close
- */
- public Collection<SQLException> closeFederationExecutor() {
- Collection<SQLException> result = new LinkedList<>();
- if (null != federationExecutor) {
- try {
- federationExecutor.close();
- } catch (final SQLException ex) {
- result.add(ex);
- }
- }
- return result;
- }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
index eb32415cfa2..d8ad97bceb7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
@@ -21,11 +21,19 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.DefaultDatabase;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.federation.executor.FederationContext;
+import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
+import org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
import org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryMergedResult;
import org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryQueryResultRow;
@@ -36,6 +44,7 @@ import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereIndex;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -43,7 +52,10 @@ import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -52,6 +64,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
+import org.mockito.MockedStatic;
import org.mockito.internal.configuration.plugins.Plugins;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.plugins.MemberAccessor;
@@ -63,10 +76,10 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.LinkedHashMap;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -74,9 +87,13 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -89,7 +106,7 @@ public final class JDBCDatabaseCommunicationEngineTest extends ProxyContextResto
@Mock
private Statement statement;
- @Mock
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ResultSet resultSet;
@Before
@@ -114,6 +131,41 @@ public final class JDBCDatabaseCommunicationEngineTest extends ProxyContextResto
return result;
}
+ @Test
+ public void assertExecuteFederationAndClose() throws SQLException, NoSuchFieldException, IllegalAccessException {
+ SQLStatementContext<?> sqlStatementContext = mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+ JDBCDatabaseCommunicationEngine engine =
+ DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(new LogicSQL(sqlStatementContext, "schemaName", Collections.emptyList()), backendConnection, true);
+ Field kernelProcessorField = DatabaseCommunicationEngine.class.getDeclaredField("kernelProcessor");
+ kernelProcessorField.setAccessible(true);
+ KernelProcessor kernelProcessor = mock(KernelProcessor.class);
+ kernelProcessorField.set(engine, kernelProcessor);
+ RouteContext routeContext = new RouteContext();
+ routeContext.setFederated(true);
+ ExecutionContext executionContext = new ExecutionContext(mock(LogicSQL.class), Collections.emptyList(), routeContext);
+ when(kernelProcessor.generateExecutionContext(any(LogicSQL.class), any(ShardingSphereDatabase.class), any(ShardingSphereRuleMetaData.class), any(ConfigurationProperties.class)))
+ .thenReturn(executionContext);
+ when(backendConnection.getConnectionSession().getStatementManager()).thenReturn(new JDBCBackendStatement());
+ FederationExecutor federationExecutor = mock(FederationExecutor.class);
+ try (MockedStatic<FederationExecutorFactory> mockedStatic = mockStatic(FederationExecutorFactory.class)) {
+ when(federationExecutor.executeQuery(any(DriverExecutionPrepareEngine.class), any(ProxyJDBCExecutorCallback.class), any(FederationContext.class))).thenReturn(resultSet);
+ when(resultSet.getMetaData().getColumnCount()).thenReturn(1);
+ when(resultSet.getMetaData().getColumnType(1)).thenReturn(Types.INTEGER);
+ when(resultSet.next()).thenReturn(true, false);
+ when(resultSet.getObject(1)).thenReturn(Integer.MAX_VALUE);
+ mockedStatic.when(() -> FederationExecutorFactory.newInstance(anyString(), nullable(String.class), any(OptimizerContext.class), any(ShardingSphereRuleMetaData.class),
+ any(ConfigurationProperties.class), any(JDBCExecutor.class), any(EventBusContext.class))).thenReturn(federationExecutor);
+ engine.execute();
+ }
+ assertTrue(engine.next());
+ QueryResponseRow actualRow = engine.getRowData();
+ assertThat(actualRow.getCells().get(0).getJdbcType(), is(Types.INTEGER));
+ assertThat(actualRow.getCells().get(0).getData(), is(Integer.MAX_VALUE));
+ assertFalse(engine.next());
+ engine.close();
+ verify(federationExecutor).close();
+ }
+
@Test
public void assertBinaryProtocolQueryHeader() throws SQLException, NoSuchFieldException, IllegalAccessException {
SQLStatementContext<?> sqlStatementContext = mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
index 50b8e35b7e4..b66ca0cbacf 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
@@ -389,7 +389,6 @@ public final class JDBCBackendConnectionTest extends ProxyContextRestorer {
public void assertCloseExecutionResources() throws BackendConnectionException {
backendConnection.closeExecutionResources();
verify(backendConnection).closeHandlers(false);
- verify(backendConnection).closeFederationExecutor();
verify(backendConnection).closeHandlers(true);
verify(backendConnection).closeConnections(false);
}
@@ -399,6 +398,5 @@ public final class JDBCBackendConnectionTest extends ProxyContextRestorer {
backendConnection.closeAllResources();
verify(backendConnection).closeHandlers(true);
verify(backendConnection).closeConnections(true);
- verify(backendConnection).closeFederationExecutor();
}
}