You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/07/30 04:35:29 UTC

[shardingsphere] branch master updated: Move closeFederationExecutor into DatabaseCommunicationEngine (#19706)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e6afb9b7bbf Move closeFederationExecutor into DatabaseCommunicationEngine (#19706)
e6afb9b7bbf is described below

commit e6afb9b7bbf9e236974e05aa57256ccc885c23a6
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Sat Jul 30 12:35:19 2022 +0800

    Move closeFederationExecutor into DatabaseCommunicationEngine (#19706)
    
    * Close federation executor in JDBCDatabaseCommunicationEngine
    
    * Complete JDBCDatabaseCommunicationEngineTest
    
    * Remove unused Setter in JDBCBackendConnection
---
 .../jdbc/JDBCDatabaseCommunicationEngine.java      | 36 ++++++++++----
 .../jdbc/connection/JDBCBackendConnection.java     | 26 +---------
 .../jdbc/JDBCDatabaseCommunicationEngineTest.java  | 56 +++++++++++++++++++++-
 .../jdbc/connection/JDBCBackendConnectionTest.java |  2 -
 4 files changed, 81 insertions(+), 39 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index a5ccb845589..2d34eb19b18 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -62,6 +62,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -75,21 +76,14 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
     
     private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();
     
-    private final FederationExecutor federationExecutor;
-    
     private final JDBCBackendConnection backendConnection;
     
+    private volatile FederationExecutor federationExecutor;
+    
     public JDBCDatabaseCommunicationEngine(final String driverType, final ShardingSphereDatabase database, final LogicSQL logicSQL, final JDBCBackendConnection backendConnection) {
         super(driverType, database, logicSQL, backendConnection);
         proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection, this);
         this.backendConnection = backendConnection;
-        MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
-        String databaseName = backendConnection.getConnectionSession().getDatabaseName();
-        DatabaseType databaseType = logicSQL.getSqlStatementContext().getDatabaseType();
-        String schemaName = logicSQL.getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(databaseType, databaseName));
-        federationExecutor = FederationExecutorFactory.newInstance(databaseName, schemaName, metaDataContexts.getOptimizerContext(), metaDataContexts.getMetaData().getGlobalRuleMetaData(),
-                metaDataContexts.getMetaData().getProps(), new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), backendConnection.isSerialExecute()),
-                ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
     }
     
     /**
@@ -127,6 +121,7 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
         ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName());
         if (executionContext.getRouteContext().isFederated() || (sqlStatementContext instanceof SelectStatementContext
                 && SystemSchemaUtil.containsSystemSchema(sqlStatementContext.getDatabaseType(), sqlStatementContext.getTablesContext().getSchemaNames(), database))) {
+            prepareFederationExecutor();
             ResultSet resultSet = doExecuteFederation(logicSQL, metaDataContexts);
             return processExecuteFederation(resultSet, metaDataContexts);
         }
@@ -143,6 +138,16 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
                 : processExecuteUpdate(executionContext, result);
     }
     
+    private void prepareFederationExecutor() {
+        MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+        String databaseName = backendConnection.getConnectionSession().getDatabaseName();
+        DatabaseType databaseType = getLogicSQL().getSqlStatementContext().getDatabaseType();
+        String schemaName = getLogicSQL().getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(databaseType, databaseName));
+        federationExecutor = FederationExecutorFactory.newInstance(databaseName, schemaName, metaDataContexts.getOptimizerContext(), metaDataContexts.getMetaData().getGlobalRuleMetaData(),
+                metaDataContexts.getMetaData().getProps(), new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), backendConnection.isSerialExecute()),
+                ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+    }
+    
     private ResultSet doExecuteFederation(final LogicSQL logicSQL, final MetaDataContexts metaDataContexts) throws SQLException {
         boolean isReturnGeneratedKeys = logicSQL.getSqlStatementContext().getSqlStatement() instanceof MySQLInsertStatement;
         ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName());
@@ -150,7 +155,6 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
         DatabaseType databaseType = database.getResource().getDatabaseType();
         ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(getDriverType(), protocolType, databaseType,
                 logicSQL.getSqlStatementContext().getSqlStatement(), this, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
-        backendConnection.setFederationExecutor(federationExecutor);
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaDataContexts);
         FederationContext context = new FederationContext(false, logicSQL, metaDataContexts.getMetaData().getDatabases());
         return federationExecutor.executeQuery(prepareEngine, callback, context);
@@ -185,6 +189,7 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
         Collection<SQLException> result = new LinkedList<>();
         result.addAll(closeResultSets());
         result.addAll(closeStatements());
+        closeFederationExecutor().ifPresent(result::add);
         if (result.isEmpty()) {
             return;
         }
@@ -219,4 +224,15 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
         cachedStatements.clear();
         return result;
     }
+    
+    private Optional<SQLException> closeFederationExecutor() {
+        if (null != federationExecutor) {
+            try {
+                federationExecutor.close();
+            } catch (final SQLException ex) {
+                return Optional.of(ex);
+            }
+        }
+        return Optional.empty();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
index ebb73dc4ee1..1d24788bd59 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
@@ -22,16 +22,14 @@ import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Multimap;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCConnectionManager;
-import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
 import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.util.TransactionUtil;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 
@@ -50,13 +48,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @RequiredArgsConstructor
 @Getter
-@Setter
 public final class JDBCBackendConnection implements BackendConnection<Void>, ExecutorJDBCConnectionManager {
     
     private final ConnectionSession connectionSession;
     
-    private volatile FederationExecutor federationExecutor;
-    
     private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
     
     private final Collection<ProxyBackendHandler> backendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
@@ -192,7 +187,6 @@ public final class JDBCBackendConnection implements BackendConnection<Void>, Exe
         synchronized (this) {
             Collection<Exception> result = new LinkedList<>();
             result.addAll(closeHandlers(false));
-            result.addAll(closeFederationExecutor());
             if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction()) {
                 result.addAll(closeHandlers(true));
                 result.addAll(closeConnections(false));
@@ -213,7 +207,6 @@ public final class JDBCBackendConnection implements BackendConnection<Void>, Exe
             closed.set(true);
             closeHandlers(true);
             closeConnections(true);
-            closeFederationExecutor();
             return null;
         }
     }
@@ -269,21 +262,4 @@ public final class JDBCBackendConnection implements BackendConnection<Void>, Exe
         }
         return result;
     }
-    
-    /**
-     * Close federation executor.
-     * 
-     * @return SQL exception when federation executor close
-     */
-    public Collection<SQLException> closeFederationExecutor() {
-        Collection<SQLException> result = new LinkedList<>();
-        if (null != federationExecutor) {
-            try {
-                federationExecutor.close();
-            } catch (final SQLException ex) {
-                result.add(ex);
-            }
-        }
-        return result;
-    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
index eb32415cfa2..d8ad97bceb7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
@@ -21,11 +21,19 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.database.DefaultDatabase;
 import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.federation.executor.FederationContext;
+import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
+import org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
 import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
 import org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryMergedResult;
 import org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryQueryResultRow;
@@ -36,6 +44,7 @@ import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereIndex;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.route.context.RouteContext;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -43,7 +52,10 @@ import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
 import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -52,6 +64,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
 import org.mockito.Mock;
+import org.mockito.MockedStatic;
 import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.plugins.MemberAccessor;
@@ -63,10 +76,10 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.LinkedHashMap;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -74,9 +87,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -89,7 +106,7 @@ public final class JDBCDatabaseCommunicationEngineTest extends ProxyContextResto
     @Mock
     private Statement statement;
     
-    @Mock
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ResultSet resultSet;
     
     @Before
@@ -114,6 +131,41 @@ public final class JDBCDatabaseCommunicationEngineTest extends ProxyContextResto
         return result;
     }
     
+    @Test
+    public void assertExecuteFederationAndClose() throws SQLException, NoSuchFieldException, IllegalAccessException {
+        SQLStatementContext<?> sqlStatementContext = mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+        JDBCDatabaseCommunicationEngine engine =
+                DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(new LogicSQL(sqlStatementContext, "schemaName", Collections.emptyList()), backendConnection, true);
+        Field kernelProcessorField = DatabaseCommunicationEngine.class.getDeclaredField("kernelProcessor");
+        kernelProcessorField.setAccessible(true);
+        KernelProcessor kernelProcessor = mock(KernelProcessor.class);
+        kernelProcessorField.set(engine, kernelProcessor);
+        RouteContext routeContext = new RouteContext();
+        routeContext.setFederated(true);
+        ExecutionContext executionContext = new ExecutionContext(mock(LogicSQL.class), Collections.emptyList(), routeContext);
+        when(kernelProcessor.generateExecutionContext(any(LogicSQL.class), any(ShardingSphereDatabase.class), any(ShardingSphereRuleMetaData.class), any(ConfigurationProperties.class)))
+                .thenReturn(executionContext);
+        when(backendConnection.getConnectionSession().getStatementManager()).thenReturn(new JDBCBackendStatement());
+        FederationExecutor federationExecutor = mock(FederationExecutor.class);
+        try (MockedStatic<FederationExecutorFactory> mockedStatic = mockStatic(FederationExecutorFactory.class)) {
+            when(federationExecutor.executeQuery(any(DriverExecutionPrepareEngine.class), any(ProxyJDBCExecutorCallback.class), any(FederationContext.class))).thenReturn(resultSet);
+            when(resultSet.getMetaData().getColumnCount()).thenReturn(1);
+            when(resultSet.getMetaData().getColumnType(1)).thenReturn(Types.INTEGER);
+            when(resultSet.next()).thenReturn(true, false);
+            when(resultSet.getObject(1)).thenReturn(Integer.MAX_VALUE);
+            mockedStatic.when(() -> FederationExecutorFactory.newInstance(anyString(), nullable(String.class), any(OptimizerContext.class), any(ShardingSphereRuleMetaData.class),
+                    any(ConfigurationProperties.class), any(JDBCExecutor.class), any(EventBusContext.class))).thenReturn(federationExecutor);
+            engine.execute();
+        }
+        assertTrue(engine.next());
+        QueryResponseRow actualRow = engine.getRowData();
+        assertThat(actualRow.getCells().get(0).getJdbcType(), is(Types.INTEGER));
+        assertThat(actualRow.getCells().get(0).getData(), is(Integer.MAX_VALUE));
+        assertFalse(engine.next());
+        engine.close();
+        verify(federationExecutor).close();
+    }
+    
     @Test
     public void assertBinaryProtocolQueryHeader() throws SQLException, NoSuchFieldException, IllegalAccessException {
         SQLStatementContext<?> sqlStatementContext = mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
index 50b8e35b7e4..b66ca0cbacf 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
@@ -389,7 +389,6 @@ public final class JDBCBackendConnectionTest extends ProxyContextRestorer {
     public void assertCloseExecutionResources() throws BackendConnectionException {
         backendConnection.closeExecutionResources();
         verify(backendConnection).closeHandlers(false);
-        verify(backendConnection).closeFederationExecutor();
         verify(backendConnection).closeHandlers(true);
         verify(backendConnection).closeConnections(false);
     }
@@ -399,6 +398,5 @@ public final class JDBCBackendConnectionTest extends ProxyContextRestorer {
         backendConnection.closeAllResources();
         verify(backendConnection).closeHandlers(true);
         verify(backendConnection).closeConnections(true);
-        verify(backendConnection).closeFederationExecutor();
     }
 }