You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/30 14:15:34 UTC

[shardingsphere] branch master updated: Merge DatabaseCommunicationEngine and JDBCDatabaseCommunicationEngine (#8427)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 31fb74f  Merge DatabaseCommunicationEngine and JDBCDatabaseCommunicationEngine (#8427)
31fb74f is described below

commit 31fb74fd516b5064b0ceb70e62be58bfd2c6945a
Author: Liang Zhang <te...@163.com>
AuthorDate: Mon Nov 30 22:15:14 2020 +0800

    Merge DatabaseCommunicationEngine and JDBCDatabaseCommunicationEngine (#8427)
---
 .../communication/DatabaseCommunicationEngine.java | 171 +++++++++++++++++-
 .../DatabaseCommunicationEngineFactory.java        |   5 +-
 .../jdbc/JDBCDatabaseCommunicationEngine.java      | 201 ---------------------
 .../DatabaseCommunicationEngineFactoryTest.java    |   8 +-
 4 files changed, 171 insertions(+), 214 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index cc28518..135f55a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -17,15 +17,63 @@
 
 package org.apache.shardingsphere.proxy.backend.communication;
 
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
+import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
+import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseData;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
+import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Database communication engine.
  */
-public interface DatabaseCommunicationEngine {
+@RequiredArgsConstructor
+public final class DatabaseCommunicationEngine {
+    
+    private final LogicSQL logicSQL;
+    
+    private final ShardingSphereMetaData metaData;
+    
+    private final ProxySQLExecutor proxySQLExecutor;
+    
+    private final KernelProcessor kernelProcessor = new KernelProcessor();
+    
+    private List<QueryHeader> queryHeaders;
+    
+    private MergedResult mergedResult;
     
     /**
      * Execute to database.
@@ -33,7 +81,112 @@ public interface DatabaseCommunicationEngine {
      * @return backend response
      * @throws SQLException SQL exception
      */
-    ResponseHeader execute() throws SQLException;
+    public ResponseHeader execute() throws SQLException {
+        ExecutionContext executionContext = kernelProcessor.generateExecutionContext(logicSQL, metaData, ProxyContext.getInstance().getMetaDataContexts().getProps());
+        logSQL(executionContext);
+        return doExecute(executionContext);
+    }
+    
+    private void logSQL(final ExecutionContext executionContext) {
+        if (ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
+            SQLLogger.logSQL(logicSQL, ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), executionContext);
+        }
+    }
+    
+    private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
+        if (executionContext.getExecutionUnits().isEmpty()) {
+            return new UpdateResponseHeader();
+        }
+        proxySQLExecutor.checkExecutePrerequisites(executionContext);
+        Collection<ExecuteResult> executeResults = proxySQLExecutor.execute(executionContext);
+        ExecuteResult executeResultSample = executeResults.iterator().next();
+        return executeResultSample instanceof QueryResult
+                ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
+    }
+    
+    private QueryResponseHeader processExecuteQuery(final ExecutionContext executionContext, 
+                                                    final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
+        queryHeaders = createQueryHeaders(executionContext, executeResultSample);
+        mergedResult = mergeQuery(executionContext.getSqlStatementContext(), executeResults.stream().map(each -> (QueryResult) each).collect(Collectors.toList()));
+        return new QueryResponseHeader(queryHeaders);
+    }
+    
+    private List<QueryHeader> createQueryHeaders(final ExecutionContext executionContext, final QueryResult executeResultSample) throws SQLException {
+        int columnCount = executeResultSample.getMetaData().getColumnCount();
+        List<QueryHeader> result = new ArrayList<>(columnCount);
+        for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+            result.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
+        }
+        return result;
+    }
+    
+    private QueryHeader createQueryHeader(final ExecutionContext executionContext,
+                                          final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
+        return hasSelectExpandProjections(executionContext.getSqlStatementContext())
+                ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
+                : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
+    }
+    
+    private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
+        return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
+    }
+    
+    private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
+        MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+                metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
+        return mergeEngine.merge(queryResults, sqlStatementContext);
+    }
+    
+    private UpdateResponseHeader processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) throws SQLException {
+        UpdateResponseHeader result = createUpdateResponse(executionContext, executeResults);
+        refreshSchema(executionContext);
+        mergeUpdateCount(executionContext.getSqlStatementContext(), result);
+        return result;
+    }
+    
+    private UpdateResponseHeader createUpdateResponse(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
+        UpdateResponseHeader result = new UpdateResponseHeader(executeResults);
+        if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
+            result.setType("INSERT");
+        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
+            result.setType("DELETE");
+        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
+            result.setType("UPDATE");
+        }
+        return result;
+    }
+    
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void refreshSchema(final ExecutionContext executionContext) throws SQLException {
+        SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+        if (null == sqlStatement) {
+            return;
+        }
+        Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
+        if (schemaRefresher.isPresent()) {
+            Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+            SchemaBuilderMaterials materials = new SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(), 
+                    metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
+            schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
+            notifySchemaChanged(metaData.getName(), metaData.getSchema());
+        }
+    }
+    
+    private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
+        OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
+    }
+    
+    private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext, final UpdateResponseHeader response) {
+        if (isNeedAccumulate(sqlStatementContext)) {
+            response.mergeUpdateCount();
+        }
+    }
+    
+    private boolean isNeedAccumulate(final SQLStatementContext<?> sqlStatementContext) {
+        Optional<DataNodeContainedRule> dataNodeContainedRule = 
+                metaData.getRuleMetaData().getRules().stream().filter(each -> each instanceof DataNodeContainedRule).findFirst().map(rule -> (DataNodeContainedRule) rule);
+        return dataNodeContainedRule.isPresent() && dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
+    }
     
     /**
      * Goto next result value.
@@ -41,7 +194,9 @@ public interface DatabaseCommunicationEngine {
      * @return has more result value or not
      * @throws SQLException SQL exception
      */
-    boolean next() throws SQLException;
+    public boolean next() throws SQLException {
+        return null != mergedResult && mergedResult.next();
+    }
     
     /**
      * Get query response data.
@@ -49,5 +204,11 @@ public interface DatabaseCommunicationEngine {
      * @return query response data
      * @throws SQLException SQL exception
      */
-    QueryResponseData getQueryResponseData() throws SQLException;
+    public QueryResponseData getQueryResponseData() throws SQLException {
+        List<Object> row = new ArrayList<>(queryHeaders.size());
+        for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
+            row.add(mergedResult.getValue(columnIndex, Object.class));
+        }
+        return new QueryResponseData(queryHeaders.stream().map(QueryHeader::getColumnType).collect(Collectors.toList()), row);
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
index 913c80c..5756251 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder.JDBCExecutionUnitBuilderType;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -62,7 +61,7 @@ public final class DatabaseCommunicationEngineFactory {
         ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
         LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, Collections.emptyList(), metaData);
         ProxySQLExecutor proxySQLExecutor = new ProxySQLExecutor(JDBCExecutionUnitBuilderType.STATEMENT, backendConnection);
-        return new JDBCDatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
+        return new DatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
     }
     
     /**
@@ -78,7 +77,7 @@ public final class DatabaseCommunicationEngineFactory {
         ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
         LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, new ArrayList<>(parameters), metaData);
         ProxySQLExecutor proxySQLExecutor = new ProxySQLExecutor(JDBCExecutionUnitBuilderType.PREPARED_STATEMENT, backendConnection);
-        return new JDBCDatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
+        return new DatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
     }
     
     private LogicSQL createLogicSQL(final SQLStatement sqlStatement, final String sql, final List<Object> parameters, final ShardingSphereMetaData metaData) {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
deleted file mode 100644
index 456698d..0000000
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.communication.jdbc;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
-import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
-import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
-import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
-import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
-import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
-import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import org.apache.shardingsphere.proxy.backend.communication.ProxySQLExecutor;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseData;
-import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
-import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
-import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
-import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Database access engine for JDBC.
- */
-@RequiredArgsConstructor
-public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicationEngine {
-    
-    private final LogicSQL logicSQL;
-    
-    private final ShardingSphereMetaData metaData;
-    
-    private final ProxySQLExecutor proxySQLExecutor;
-    
-    private final KernelProcessor kernelProcessor = new KernelProcessor();
-    
-    private List<QueryHeader> queryHeaders;
-    
-    private MergedResult mergedResult;
-    
-    @Override
-    public ResponseHeader execute() throws SQLException {
-        ExecutionContext executionContext = kernelProcessor.generateExecutionContext(logicSQL, metaData, ProxyContext.getInstance().getMetaDataContexts().getProps());
-        logSQL(executionContext);
-        return doExecute(executionContext);
-    }
-    
-    private void logSQL(final ExecutionContext executionContext) {
-        if (ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
-            SQLLogger.logSQL(logicSQL, ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), executionContext);
-        }
-    }
-    
-    private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
-        if (executionContext.getExecutionUnits().isEmpty()) {
-            return new UpdateResponseHeader();
-        }
-        proxySQLExecutor.checkExecutePrerequisites(executionContext);
-        Collection<ExecuteResult> executeResults = proxySQLExecutor.execute(executionContext);
-        ExecuteResult executeResultSample = executeResults.iterator().next();
-        return executeResultSample instanceof QueryResult
-                ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
-    }
-    
-    private QueryResponseHeader processExecuteQuery(final ExecutionContext executionContext, 
-                                                    final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
-        queryHeaders = createQueryHeaders(executionContext, executeResultSample);
-        mergedResult = mergeQuery(executionContext.getSqlStatementContext(), executeResults.stream().map(each -> (QueryResult) each).collect(Collectors.toList()));
-        return new QueryResponseHeader(queryHeaders);
-    }
-    
-    private List<QueryHeader> createQueryHeaders(final ExecutionContext executionContext, final QueryResult executeResultSample) throws SQLException {
-        int columnCount = executeResultSample.getMetaData().getColumnCount();
-        List<QueryHeader> result = new ArrayList<>(columnCount);
-        for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
-            result.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
-        }
-        return result;
-    }
-    
-    private QueryHeader createQueryHeader(final ExecutionContext executionContext,
-                                          final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
-        return hasSelectExpandProjections(executionContext.getSqlStatementContext())
-                ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
-                : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
-    }
-    
-    private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
-        return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
-    }
-    
-    private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
-        MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
-                metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
-        return mergeEngine.merge(queryResults, sqlStatementContext);
-    }
-    
-    private UpdateResponseHeader processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) throws SQLException {
-        UpdateResponseHeader result = createUpdateResponse(executionContext, executeResults);
-        refreshSchema(executionContext);
-        mergeUpdateCount(executionContext.getSqlStatementContext(), result);
-        return result;
-    }
-    
-    private UpdateResponseHeader createUpdateResponse(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
-        UpdateResponseHeader result = new UpdateResponseHeader(executeResults);
-        if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
-            result.setType("INSERT");
-        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
-            result.setType("DELETE");
-        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
-            result.setType("UPDATE");
-        }
-        return result;
-    }
-    
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private void refreshSchema(final ExecutionContext executionContext) throws SQLException {
-        SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
-        if (null == sqlStatement) {
-            return;
-        }
-        Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
-        if (schemaRefresher.isPresent()) {
-            Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
-            SchemaBuilderMaterials materials = new SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(), 
-                    metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
-            schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
-            notifySchemaChanged(metaData.getName(), metaData.getSchema());
-        }
-    }
-    
-    private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
-        OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
-    }
-    
-    private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext, final UpdateResponseHeader response) {
-        if (isNeedAccumulate(sqlStatementContext)) {
-            response.mergeUpdateCount();
-        }
-    }
-    
-    private boolean isNeedAccumulate(final SQLStatementContext<?> sqlStatementContext) {
-        Optional<DataNodeContainedRule> dataNodeContainedRule = 
-                metaData.getRuleMetaData().getRules().stream().filter(each -> each instanceof DataNodeContainedRule).findFirst().map(rule -> (DataNodeContainedRule) rule);
-        return dataNodeContainedRule.isPresent() && dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
-    }
-    
-    @Override
-    public boolean next() throws SQLException {
-        return null != mergedResult && mergedResult.next();
-    }
-    
-    @Override
-    public QueryResponseData getQueryResponseData() throws SQLException {
-        List<Object> row = new ArrayList<>(queryHeaders.size());
-        for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
-            row.add(mergedResult.getValue(columnIndex, Object.class));
-        }
-        return new QueryResponseData(queryHeaders.stream().map(QueryHeader::getColumnType).collect(Collectors.toList()), row);
-    }
-}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
similarity index 90%
rename from shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java
rename to shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
index 35a858e..9acfbbd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.proxy.backend.communication.jdbc;
+package org.apache.shardingsphere.proxy.backend.communication;
 
 import org.apache.shardingsphere.infra.auth.Authentication;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
@@ -23,8 +23,6 @@ import org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataCon
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -69,7 +67,7 @@ public final class DatabaseCommunicationEngineFactoryTest {
         DatabaseCommunicationEngine engine =
                 DatabaseCommunicationEngineFactory.getInstance().newTextProtocolInstance(mock(SQLStatement.class), "schemaName", backendConnection);
         assertNotNull(engine);
-        assertThat(engine, instanceOf(JDBCDatabaseCommunicationEngine.class));
+        assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
     }
     
     @Test
@@ -79,6 +77,6 @@ public final class DatabaseCommunicationEngineFactoryTest {
         DatabaseCommunicationEngine engine =
                 DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(SQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
         assertNotNull(engine);
-        assertThat(engine, instanceOf(JDBCDatabaseCommunicationEngine.class));
+        assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
     }
 }