You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/30 14:15:34 UTC
[shardingsphere] branch master updated: Merge
DatabaseCommunicationEngine and JDBCDatabaseCommunicationEngine (#8427)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 31fb74f Merge DatabaseCommunicationEngine and JDBCDatabaseCommunicationEngine (#8427)
31fb74f is described below
commit 31fb74fd516b5064b0ceb70e62be58bfd2c6945a
Author: Liang Zhang <te...@163.com>
AuthorDate: Mon Nov 30 22:15:14 2020 +0800
Merge DatabaseCommunicationEngine and JDBCDatabaseCommunicationEngine (#8427)
---
.../communication/DatabaseCommunicationEngine.java | 171 +++++++++++++++++-
.../DatabaseCommunicationEngineFactory.java | 5 +-
.../jdbc/JDBCDatabaseCommunicationEngine.java | 201 ---------------------
.../DatabaseCommunicationEngineFactoryTest.java | 8 +-
4 files changed, 171 insertions(+), 214 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index cc28518..135f55a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -17,15 +17,63 @@
package org.apache.shardingsphere.proxy.backend.communication;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
+import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
+import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
+import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseData;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
+import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Database communication engine.
*/
-public interface DatabaseCommunicationEngine {
+@RequiredArgsConstructor
+public final class DatabaseCommunicationEngine {
+
+ private final LogicSQL logicSQL;
+
+ private final ShardingSphereMetaData metaData;
+
+ private final ProxySQLExecutor proxySQLExecutor;
+
+ private final KernelProcessor kernelProcessor = new KernelProcessor();
+
+ private List<QueryHeader> queryHeaders;
+
+ private MergedResult mergedResult;
/**
* Execute to database.
@@ -33,7 +81,112 @@ public interface DatabaseCommunicationEngine {
* @return backend response
* @throws SQLException SQL exception
*/
- ResponseHeader execute() throws SQLException;
+ public ResponseHeader execute() throws SQLException {
+ ExecutionContext executionContext = kernelProcessor.generateExecutionContext(logicSQL, metaData, ProxyContext.getInstance().getMetaDataContexts().getProps());
+ logSQL(executionContext);
+ return doExecute(executionContext);
+ }
+
+ private void logSQL(final ExecutionContext executionContext) {
+ if (ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
+ SQLLogger.logSQL(logicSQL, ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), executionContext);
+ }
+ }
+
+ private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
+ if (executionContext.getExecutionUnits().isEmpty()) {
+ return new UpdateResponseHeader();
+ }
+ proxySQLExecutor.checkExecutePrerequisites(executionContext);
+ Collection<ExecuteResult> executeResults = proxySQLExecutor.execute(executionContext);
+ ExecuteResult executeResultSample = executeResults.iterator().next();
+ return executeResultSample instanceof QueryResult
+ ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
+ }
+
+ private QueryResponseHeader processExecuteQuery(final ExecutionContext executionContext,
+ final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
+ queryHeaders = createQueryHeaders(executionContext, executeResultSample);
+ mergedResult = mergeQuery(executionContext.getSqlStatementContext(), executeResults.stream().map(each -> (QueryResult) each).collect(Collectors.toList()));
+ return new QueryResponseHeader(queryHeaders);
+ }
+
+ private List<QueryHeader> createQueryHeaders(final ExecutionContext executionContext, final QueryResult executeResultSample) throws SQLException {
+ int columnCount = executeResultSample.getMetaData().getColumnCount();
+ List<QueryHeader> result = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+ result.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
+ }
+ return result;
+ }
+
+ private QueryHeader createQueryHeader(final ExecutionContext executionContext,
+ final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
+ return hasSelectExpandProjections(executionContext.getSqlStatementContext())
+ ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
+ : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
+ }
+
+ private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
+ return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
+ }
+
+ private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
+ MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+ metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
+ return mergeEngine.merge(queryResults, sqlStatementContext);
+ }
+
+ private UpdateResponseHeader processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) throws SQLException {
+ UpdateResponseHeader result = createUpdateResponse(executionContext, executeResults);
+ refreshSchema(executionContext);
+ mergeUpdateCount(executionContext.getSqlStatementContext(), result);
+ return result;
+ }
+
+ private UpdateResponseHeader createUpdateResponse(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
+ UpdateResponseHeader result = new UpdateResponseHeader(executeResults);
+ if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
+ result.setType("INSERT");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
+ result.setType("DELETE");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
+ result.setType("UPDATE");
+ }
+ return result;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void refreshSchema(final ExecutionContext executionContext) throws SQLException {
+ SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+ if (null == sqlStatement) {
+ return;
+ }
+ Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
+ if (schemaRefresher.isPresent()) {
+ Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+ SchemaBuilderMaterials materials = new SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+ metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
+ schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
+ notifySchemaChanged(metaData.getName(), metaData.getSchema());
+ }
+ }
+
+ private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
+ OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
+ }
+
+ private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext, final UpdateResponseHeader response) {
+ if (isNeedAccumulate(sqlStatementContext)) {
+ response.mergeUpdateCount();
+ }
+ }
+
+ private boolean isNeedAccumulate(final SQLStatementContext<?> sqlStatementContext) {
+ Optional<DataNodeContainedRule> dataNodeContainedRule =
+ metaData.getRuleMetaData().getRules().stream().filter(each -> each instanceof DataNodeContainedRule).findFirst().map(rule -> (DataNodeContainedRule) rule);
+ return dataNodeContainedRule.isPresent() && dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
+ }
/**
* Goto next result value.
@@ -41,7 +194,9 @@ public interface DatabaseCommunicationEngine {
* @return has more result value or not
* @throws SQLException SQL exception
*/
- boolean next() throws SQLException;
+ public boolean next() throws SQLException {
+ return null != mergedResult && mergedResult.next();
+ }
/**
* Get query response data.
@@ -49,5 +204,11 @@ public interface DatabaseCommunicationEngine {
* @return query response data
* @throws SQLException SQL exception
*/
- QueryResponseData getQueryResponseData() throws SQLException;
+ public QueryResponseData getQueryResponseData() throws SQLException {
+ List<Object> row = new ArrayList<>(queryHeaders.size());
+ for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
+ row.add(mergedResult.getValue(columnIndex, Object.class));
+ }
+ return new QueryResponseData(queryHeaders.stream().map(QueryHeader::getColumnType).collect(Collectors.toList()), row);
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
index 913c80c..5756251 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder.JDBCExecutionUnitBuilderType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -62,7 +61,7 @@ public final class DatabaseCommunicationEngineFactory {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, Collections.emptyList(), metaData);
ProxySQLExecutor proxySQLExecutor = new ProxySQLExecutor(JDBCExecutionUnitBuilderType.STATEMENT, backendConnection);
- return new JDBCDatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
+ return new DatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
}
/**
@@ -78,7 +77,7 @@ public final class DatabaseCommunicationEngineFactory {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, new ArrayList<>(parameters), metaData);
ProxySQLExecutor proxySQLExecutor = new ProxySQLExecutor(JDBCExecutionUnitBuilderType.PREPARED_STATEMENT, backendConnection);
- return new JDBCDatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
+ return new DatabaseCommunicationEngine(logicSQL, metaData, proxySQLExecutor);
}
private LogicSQL createLogicSQL(final SQLStatement sqlStatement, final String sql, final List<Object> parameters, final ShardingSphereMetaData metaData) {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
deleted file mode 100644
index 456698d..0000000
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.communication.jdbc;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
-import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
-import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
-import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
-import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
-import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
-import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import org.apache.shardingsphere.proxy.backend.communication.ProxySQLExecutor;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseData;
-import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
-import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
-import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
-import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Database access engine for JDBC.
- */
-@RequiredArgsConstructor
-public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicationEngine {
-
- private final LogicSQL logicSQL;
-
- private final ShardingSphereMetaData metaData;
-
- private final ProxySQLExecutor proxySQLExecutor;
-
- private final KernelProcessor kernelProcessor = new KernelProcessor();
-
- private List<QueryHeader> queryHeaders;
-
- private MergedResult mergedResult;
-
- @Override
- public ResponseHeader execute() throws SQLException {
- ExecutionContext executionContext = kernelProcessor.generateExecutionContext(logicSQL, metaData, ProxyContext.getInstance().getMetaDataContexts().getProps());
- logSQL(executionContext);
- return doExecute(executionContext);
- }
-
- private void logSQL(final ExecutionContext executionContext) {
- if (ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
- SQLLogger.logSQL(logicSQL, ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), executionContext);
- }
- }
-
- private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
- if (executionContext.getExecutionUnits().isEmpty()) {
- return new UpdateResponseHeader();
- }
- proxySQLExecutor.checkExecutePrerequisites(executionContext);
- Collection<ExecuteResult> executeResults = proxySQLExecutor.execute(executionContext);
- ExecuteResult executeResultSample = executeResults.iterator().next();
- return executeResultSample instanceof QueryResult
- ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
- }
-
- private QueryResponseHeader processExecuteQuery(final ExecutionContext executionContext,
- final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
- queryHeaders = createQueryHeaders(executionContext, executeResultSample);
- mergedResult = mergeQuery(executionContext.getSqlStatementContext(), executeResults.stream().map(each -> (QueryResult) each).collect(Collectors.toList()));
- return new QueryResponseHeader(queryHeaders);
- }
-
- private List<QueryHeader> createQueryHeaders(final ExecutionContext executionContext, final QueryResult executeResultSample) throws SQLException {
- int columnCount = executeResultSample.getMetaData().getColumnCount();
- List<QueryHeader> result = new ArrayList<>(columnCount);
- for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
- result.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
- }
- return result;
- }
-
- private QueryHeader createQueryHeader(final ExecutionContext executionContext,
- final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
- return hasSelectExpandProjections(executionContext.getSqlStatementContext())
- ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
- : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
- }
-
- private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
- return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
- }
-
- private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
- MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
- metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
- private UpdateResponseHeader processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) throws SQLException {
- UpdateResponseHeader result = createUpdateResponse(executionContext, executeResults);
- refreshSchema(executionContext);
- mergeUpdateCount(executionContext.getSqlStatementContext(), result);
- return result;
- }
-
- private UpdateResponseHeader createUpdateResponse(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
- UpdateResponseHeader result = new UpdateResponseHeader(executeResults);
- if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
- result.setType("INSERT");
- } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
- result.setType("DELETE");
- } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
- result.setType("UPDATE");
- }
- return result;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void refreshSchema(final ExecutionContext executionContext) throws SQLException {
- SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
- if (null == sqlStatement) {
- return;
- }
- Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
- if (schemaRefresher.isPresent()) {
- Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
- SchemaBuilderMaterials materials = new SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
- metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
- schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
- notifySchemaChanged(metaData.getName(), metaData.getSchema());
- }
- }
-
- private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
- OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
- }
-
- private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext, final UpdateResponseHeader response) {
- if (isNeedAccumulate(sqlStatementContext)) {
- response.mergeUpdateCount();
- }
- }
-
- private boolean isNeedAccumulate(final SQLStatementContext<?> sqlStatementContext) {
- Optional<DataNodeContainedRule> dataNodeContainedRule =
- metaData.getRuleMetaData().getRules().stream().filter(each -> each instanceof DataNodeContainedRule).findFirst().map(rule -> (DataNodeContainedRule) rule);
- return dataNodeContainedRule.isPresent() && dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
- }
-
- @Override
- public boolean next() throws SQLException {
- return null != mergedResult && mergedResult.next();
- }
-
- @Override
- public QueryResponseData getQueryResponseData() throws SQLException {
- List<Object> row = new ArrayList<>(queryHeaders.size());
- for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
- row.add(mergedResult.getValue(columnIndex, Object.class));
- }
- return new QueryResponseData(queryHeaders.stream().map(QueryHeader::getColumnType).collect(Collectors.toList()), row);
- }
-}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
similarity index 90%
rename from shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java
rename to shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
index 35a858e..9acfbbd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.backend.communication.jdbc;
+package org.apache.shardingsphere.proxy.backend.communication;
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
@@ -23,8 +23,6 @@ import org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataCon
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -69,7 +67,7 @@ public final class DatabaseCommunicationEngineFactoryTest {
DatabaseCommunicationEngine engine =
DatabaseCommunicationEngineFactory.getInstance().newTextProtocolInstance(mock(SQLStatement.class), "schemaName", backendConnection);
assertNotNull(engine);
- assertThat(engine, instanceOf(JDBCDatabaseCommunicationEngine.class));
+ assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
}
@Test
@@ -79,6 +77,6 @@ public final class DatabaseCommunicationEngineFactoryTest {
DatabaseCommunicationEngine engine =
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(SQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
assertNotNull(engine);
- assertThat(engine, instanceOf(JDBCDatabaseCommunicationEngine.class));
+ assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
}
}