You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/06/09 06:11:05 UTC

[shardingsphere] branch master updated: fix sql join error with multi unbinding sharding table (#10728)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f9c0e4  fix sql join error with multi unbinding sharding table (#10728)
4f9c0e4 is described below

commit 4f9c0e42dc15fd2b879f7f3c93369867499d2159
Author: Zhengqiang Duan <st...@gmail.com>
AuthorDate: Wed Jun 9 14:10:19 2021 +0800

    fix sql join error with multi unbinding sharding table (#10728)
---
 .../sql/federate/execute/FederateExecutor.java     |  5 ++++-
 .../sql/federate/execute/FederateJDBCExecutor.java | 25 +++++++++++++---------
 .../federate/execute/raw/FederateRawExecutor.java  |  4 +++-
 .../federate/schema/row/FederateRowExecutor.java   | 10 ++++-----
 .../statement/ShardingSpherePreparedStatement.java | 10 +++------
 .../core/statement/ShardingSphereStatement.java    |  2 +-
 .../backend/communication/ProxySQLExecutor.java    |  7 +++---
 7 files changed, 34 insertions(+), 29 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
index 58da05e..f646d34 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -36,10 +37,12 @@ public interface FederateExecutor {
      *
      * @param executionContext execution context
      * @param callback callback
+     * @param type JDBC driver type
+     * @param statementOption statement option
      * @return execute result
      * @throws SQLException SQL exception
      */
-    List<QueryResult> executeQuery(ExecutionContext executionContext, JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException;
+    List<QueryResult> executeQuery(ExecutionContext executionContext, JDBCExecutorCallback<? extends ExecuteResult> callback, String type, StatementOption statementOption) throws SQLException;
     
     /**
      * Close.
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
index 824b275..84eed4d 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.dr
 import org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
 import org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
@@ -87,17 +88,18 @@ public final class FederateJDBCExecutor implements FederateExecutor {
     }
     
     @Override
-    public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
-        QueryResult result = new JDBCStreamQueryResult(execute(executionContext, callback));
+    public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
+                                          final String type, final StatementOption statementOption) throws SQLException {
+        QueryResult result = new JDBCStreamQueryResult(execute(executionContext, callback, type, statementOption));
         return Collections.singletonList(result);
     }
     
     @Override
     public void close() throws SQLException {
-        if (null != statement) {
+        if (null != statement && !statement.isClosed()) {
             Connection connection = statement.getConnection();
-            connection.close();
             statement.close();
+            connection.close();
         }
     }
     
@@ -106,18 +108,20 @@ public final class FederateJDBCExecutor implements FederateExecutor {
         return statement.getResultSet();
     }
     
-    private ResultSet execute(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+    private ResultSet execute(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
+                              final String type, final StatementOption statementOption) throws SQLException {
         SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
-        PreparedStatement statement = getConnection(executionContext, callback).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
+        PreparedStatement statement = getConnection(executionContext, callback, type, statementOption).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
         setParameters(statement, sqlUnit.getParameters());
         this.statement = statement;
         return statement.executeQuery();
     }
     
-    private Connection getConnection(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+    private Connection getConnection(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
+                                     final String type, final StatementOption statementOption) throws SQLException {
         Connection result = DriverManager.getConnection(CONNECTION_URL, getProperties());
         CalciteConnection calciteConnection = result.unwrap(CalciteConnection.class);
-        addSchema(calciteConnection, executionContext, callback);
+        addSchema(calciteConnection, executionContext, callback, type, statementOption);
         return result;
     }
     
@@ -128,8 +132,9 @@ public final class FederateJDBCExecutor implements FederateExecutor {
         return result;
     }
     
-    private void addSchema(final CalciteConnection calciteConnection, final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
-        FederateRowExecutor executor = new FederateRowExecutor(rules, props, jdbcManager, jdbcExecutor, executionContext, callback);
+    private void addSchema(final CalciteConnection calciteConnection, final ExecutionContext executionContext, 
+                           final JDBCExecutorCallback<? extends ExecuteResult> callback, final String type, final StatementOption statementOption) throws SQLException {
+        FederateRowExecutor executor = new FederateRowExecutor(rules, props, jdbcManager, jdbcExecutor, executionContext, callback, type, statementOption);
         FederateLogicSchema logicSchema = new FederateLogicSchema(factory.getSchemaMetadatas().getSchemaMetadataBySchemaName(schema), executor);
         calciteConnection.getRootSchema().add(schema, logicSchema);
         calciteConnection.setSchema(schema);
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
index 112f797..33721cd 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.optimize.ShardingSphereOptimizer;
 import org.apache.shardingsphere.infra.optimize.context.OptimizeContext;
 
@@ -50,7 +51,8 @@ public final class FederateRawExecutor implements FederateExecutor {
     }
     
     @Override
-    public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+    public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
+                                          final String type, final StatementOption statementOption) throws SQLException {
         // TODO
         return Collections.emptyList();
     }
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
index ef40fdc..efd50ef 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
@@ -35,7 +35,6 @@ import org.apache.shardingsphere.infra.executor.sql.federate.schema.table.genera
 import org.apache.shardingsphere.infra.executor.sql.federate.schema.table.generator.FederateExecutionSQLGenerator;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
@@ -65,6 +64,10 @@ public final class FederateRowExecutor {
     
     private final JDBCExecutorCallback<? extends ExecuteResult> callback;
     
+    private final String type;
+    
+    private final StatementOption statementOption;
+    
     /**
      * Execute.
      *
@@ -94,12 +97,9 @@ public final class FederateRowExecutor {
     }
     
     private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
-        // TODO Set parameters for StatementOption
         int maxConnectionsSizePerQuery = props.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         Collection<ExecutionUnit> executionUnits = executionContext.getExecutionUnits();
-        String type = executionUnits.stream().anyMatch(each -> !each.getSqlUnit().getParameters().isEmpty()) ? JDBCDriverType.PREPARED_STATEMENT : JDBCDriverType.STATEMENT;
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
-                type, maxConnectionsSizePerQuery, jdbcManager, new StatementOption(true), rules);
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(type, maxConnectionsSizePerQuery, jdbcManager, statementOption, rules);
         return prepareEngine.prepare(executionContext.getRouteContext(), executionUnits);
     }
 }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index bc334af..ad98836 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
 import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
 import org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
 import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
-import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
 import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.driver.jdbc.core.constant.SQLExceptionConstant;
@@ -213,12 +212,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         if (executionContext.getExecutionUnits().isEmpty()) {
             return Collections.emptyList();
         }
-        // TODO : Please fix me here.
-        // PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), 
-        //         sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
-        StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
-                executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
-        return federateExecutor.executeQuery(executionContext, callback);
+        PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), 
+                 sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
+        return federateExecutor.executeQuery(executionContext, callback, JDBCDriverType.PREPARED_STATEMENT, statementOption);
     }
     
     @Override
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index c5c489b..396ca52 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -185,7 +185,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         }
         StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
                 executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
-        return federateExecutor.executeQuery(executionContext, callback);
+        return federateExecutor.executeQuery(executionContext, callback, JDBCDriverType.STATEMENT, statementOption);
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index a71eb60..68fbf60 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -123,7 +123,7 @@ public final class ProxySQLExecutor {
             return rawExecute(executionContext, rules, maxConnectionsSizePerQuery);
         }
         if (executionContext.getRouteContext().isFederated()) {
-            return federateExecute(executionContext, rules, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
+            return federateExecute(executionContext, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
         }
         return useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
     }
@@ -140,8 +140,7 @@ public final class ProxySQLExecutor {
         return rawExecutor.execute(executionGroupContext, executionContext.getSqlStatementContext(), new RawSQLExecutorCallback());
     }
     
-    private Collection<ExecuteResult> federateExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
-                                                      final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
+    private Collection<ExecuteResult> federateExecute(final ExecutionContext executionContext, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
         if (executionContext.getExecutionUnits().isEmpty()) {
             return Collections.emptyList();
         }
@@ -149,7 +148,7 @@ public final class ProxySQLExecutor {
         ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(type, metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(), 
                 executionContext.getSqlStatementContext().getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
         backendConnection.setFederateExecutor(federateExecutor);
-        return federateExecutor.executeQuery(executionContext, callback).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
+        return federateExecutor.executeQuery(executionContext, callback, type, new StatementOption(isReturnGeneratedKeys)).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
     }
     
     private Collection<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,