You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/17 01:22:14 UTC

[shardingsphere] branch master updated: Support sql federation stream query when not contains pipeline breaker (#26393)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d263b991c4b Support sql federation stream query when not contains pipeline breaker (#26393)
d263b991c4b is described below

commit d263b991c4bbedf9b8aaaeb1a10b032bdb16713f
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Sat Jun 17 09:22:08 2023 +0800

    Support sql federation stream query when not contains pipeline breaker (#26393)
---
 .../executor/TableScanExecutorContext.java         |  7 +-
 .../EnumerablePushDownTableScanExecutor.java       | 88 ++++++++--------------
 .../executor/row/SQLFederationRowEnumerator.java   | 43 +++++++----
 3 files changed, 65 insertions(+), 73 deletions(-)

diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
index 010195ff6dc..47df2132b41 100644
--- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
+++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
@@ -21,8 +21,11 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 /**
- * Translatable table scan executor context.
+ * Table scan executor context.
  */
 @RequiredArgsConstructor
 @Getter
@@ -35,4 +38,6 @@ public final class TableScanExecutorContext {
     private final ConfigurationProperties props;
     
     private final SQLFederationExecutorContext federationContext;
+    
+    private final Map<String, Integer> connectionOffsets = new LinkedHashMap<>();
 }
diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java
index dcb8e18baa7..8c74dec60a9 100644
--- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java
+++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java
@@ -49,14 +49,13 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
@@ -74,17 +73,17 @@ import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sqlfederation.compiler.context.OptimizerContext;
+import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationSchema;
+import org.apache.shardingsphere.sqlfederation.compiler.planner.util.SQLFederationPlannerUtils;
 import org.apache.shardingsphere.sqlfederation.executor.SQLDialectFactory;
 import org.apache.shardingsphere.sqlfederation.executor.SQLFederationDataContext;
 import org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
 import org.apache.shardingsphere.sqlfederation.executor.TableScanExecutorContext;
-import org.apache.shardingsphere.sqlfederation.executor.util.StringToRexNodeUtils;
 import org.apache.shardingsphere.sqlfederation.executor.row.EmptyRowEnumerator;
 import org.apache.shardingsphere.sqlfederation.executor.row.MemoryEnumerator;
 import org.apache.shardingsphere.sqlfederation.executor.row.SQLFederationRowEnumerator;
-import org.apache.shardingsphere.sqlfederation.compiler.context.OptimizerContext;
-import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationSchema;
-import org.apache.shardingsphere.sqlfederation.compiler.planner.util.SQLFederationPlannerUtils;
+import org.apache.shardingsphere.sqlfederation.executor.util.StringToRexNodeUtils;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -150,7 +149,7 @@ public final class EnumerablePushDownTableScanExecutor {
             federationContext.getExecutionUnits().addAll(context.getExecutionUnits());
             return createEmptyScalarEnumerable();
         }
-        return executeScalarEnumerable(databaseType, queryContext, database, context);
+        return executeScalarEnumerable(queryContext, database, context);
     }
     
     private AbstractEnumerable<Object> createEmptyScalarEnumerable() {
@@ -179,18 +178,18 @@ public final class EnumerablePushDownTableScanExecutor {
         };
     }
     
-    private AbstractEnumerable<Object> executeScalarEnumerable(final DatabaseType databaseType, final QueryContext queryContext,
-                                                               final ShardingSphereDatabase database, final ExecutionContext context) {
+    private AbstractEnumerable<Object> executeScalarEnumerable(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
         try {
+            computeConnectionOffsets(context);
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
-                    prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
+                    prepareEngine.prepare(context.getRouteContext(), executorContext.getConnectionOffsets(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
             setParameters(executionGroupContext.getInputGroups());
             processEngine.executeSQL(executionGroupContext, context.getQueryContext());
-            List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
+            List<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
             MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
             MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
             Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups());
-            return createScalarEnumerable(mergedResult, statements);
+            return createScalarEnumerable(mergedResult, queryResults.get(0).getMetaData(), statements);
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         } finally {
@@ -198,27 +197,27 @@ public final class EnumerablePushDownTableScanExecutor {
         }
     }
     
-    private AbstractEnumerable<Object> createScalarEnumerable(final MergedResult mergedResult, final Collection<Statement> statements) throws SQLException {
-        // TODO remove getRows when mergedResult support JDBC first method
-        Collection<Object> rows = getScalarRows(mergedResult);
+    private void computeConnectionOffsets(final ExecutionContext context) {
+        for (ExecutionUnit each : context.getExecutionUnits()) {
+            if (executorContext.getConnectionOffsets().containsKey(each.getDataSourceName())) {
+                int connectionOffset = executorContext.getConnectionOffsets().get(each.getDataSourceName());
+                executorContext.getConnectionOffsets().put(each.getDataSourceName(), ++connectionOffset);
+            } else {
+                executorContext.getConnectionOffsets().put(each.getDataSourceName(), 0);
+            }
+        }
+    }
+    
+    private AbstractEnumerable<Object> createScalarEnumerable(final MergedResult mergedResult, final QueryResultMetaData metaData, final Collection<Statement> statements) {
         return new AbstractEnumerable<Object>() {
             
             @Override
             public Enumerator<Object> enumerator() {
-                return new SQLFederationRowEnumerator<>(rows, statements);
+                return new SQLFederationRowEnumerator<>(mergedResult, metaData, statements);
             }
         };
     }
     
-    private Collection<Object> getScalarRows(final MergedResult mergedResult) throws SQLException {
-        Collection<Object> result = new LinkedList<>();
-        while (mergedResult.next()) {
-            Object currentRow = mergedResult.getValue(1, Object.class);
-            result.add(currentRow);
-        }
-        return result;
-    }
-    
     /**
      * Execute.
      *
@@ -242,16 +241,17 @@ public final class EnumerablePushDownTableScanExecutor {
             federationContext.getExecutionUnits().addAll(context.getExecutionUnits());
             return createEmptyEnumerable();
         }
-        return execute(databaseType, queryContext, database, context);
+        return execute(queryContext, database, context);
     }
     
-    private AbstractEnumerable<Object[]> execute(final DatabaseType databaseType, final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
+    private AbstractEnumerable<Object[]> execute(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
         try {
+            computeConnectionOffsets(context);
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
-                    prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
+                    prepareEngine.prepare(context.getRouteContext(), executorContext.getConnectionOffsets(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
             setParameters(executionGroupContext.getInputGroups());
             processEngine.executeSQL(executionGroupContext, context.getQueryContext());
-            List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
+            List<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
             MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
             MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
             Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups());
@@ -263,18 +263,6 @@ public final class EnumerablePushDownTableScanExecutor {
         }
     }
     
-    private List<QueryResult> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final DatabaseType databaseType) throws SQLException {
-        Collection<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-        List<QueryResult> result = new LinkedList<>();
-        for (QueryResult each : queryResults) {
-            QueryResult queryResult = each instanceof JDBCStreamQueryResult
-                    ? new JDBCMemoryQueryResult(((JDBCStreamQueryResult) each).getResultSet(), databaseType)
-                    : each;
-            result.add(queryResult);
-        }
-        return result;
-    }
-    
     private Enumerable<Object[]> executeByShardingSphereData(final String databaseName, final String schemaName, final ShardingSphereTable table) {
         Optional<ShardingSphereTableData> tableData = Optional.ofNullable(data.getDatabaseData().get(databaseName)).map(optional -> optional.getSchemaData().get(schemaName))
                 .map(ShardingSphereSchemaData::getTableData).map(shardingSphereData -> shardingSphereData.get(table.getName()));
@@ -378,30 +366,16 @@ public final class EnumerablePushDownTableScanExecutor {
         return result;
     }
     
-    private AbstractEnumerable<Object[]> createEnumerable(final MergedResult mergedResult, final QueryResultMetaData metaData, final Collection<Statement> statements) throws SQLException {
-        // TODO remove getRows when mergedResult support JDBC first method
-        Collection<Object[]> rows = getRows(mergedResult, metaData);
+    private AbstractEnumerable<Object[]> createEnumerable(final MergedResult mergedResult, final QueryResultMetaData metaData, final Collection<Statement> statements) {
         return new AbstractEnumerable<Object[]>() {
             
             @Override
             public Enumerator<Object[]> enumerator() {
-                return new SQLFederationRowEnumerator<>(rows, statements);
+                return new SQLFederationRowEnumerator<>(mergedResult, metaData, statements);
             }
         };
     }
     
-    private Collection<Object[]> getRows(final MergedResult mergedResult, final QueryResultMetaData metaData) throws SQLException {
-        Collection<Object[]> result = new LinkedList<>();
-        while (mergedResult.next()) {
-            Object[] currentRow = new Object[metaData.getColumnCount()];
-            for (int i = 0; i < metaData.getColumnCount(); i++) {
-                currentRow[i] = mergedResult.getValue(i + 1, Object.class);
-            }
-            result.add(currentRow);
-        }
-        return result;
-    }
-    
     private QueryContext createQueryContext(final ShardingSphereMetaData metaData, final SqlString sqlString, final DatabaseType databaseType, final boolean useCache) {
         String sql = sqlString.getSql().replace("\n", " ");
         SQLStatement sqlStatement = new SQLStatementParserEngine(databaseType.getType(),
diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
index f7c36792573..55dc3f101ed 100644
--- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
+++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
@@ -17,51 +17,65 @@
 
 package org.apache.shardingsphere.sqlfederation.executor.row;
 
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import org.apache.calcite.linq4j.Enumerator;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
-import java.util.Iterator;
 
 /**
  * SQL federation row enumerator.
  * 
  * @param <T> type of row
  */
+@RequiredArgsConstructor
 public final class SQLFederationRowEnumerator<T> implements Enumerator<T> {
     
-    private final Collection<T> rows;
+    private final MergedResult queryResult;
     
-    private final Collection<Statement> statements;
+    private final QueryResultMetaData metaData;
     
-    private Iterator<T> iterator;
+    private final Collection<Statement> statements;
     
     private T currentRow;
     
-    public SQLFederationRowEnumerator(final Collection<T> rows, final Collection<Statement> statements) {
-        this.rows = rows;
-        this.statements = statements;
-        iterator = rows.iterator();
-    }
-    
     @Override
     public T current() {
         return currentRow;
     }
     
+    @SneakyThrows
     @Override
     public boolean moveNext() {
-        if (iterator.hasNext()) {
-            currentRow = iterator.next();
+        return moveNext0();
+    }
+    
+    private boolean moveNext0() throws SQLException {
+        if (queryResult.next()) {
+            setCurrentRow();
             return true;
         }
-        currentRow = null;
-        iterator = rows.iterator();
         return false;
     }
     
+    @SuppressWarnings("unchecked")
+    private void setCurrentRow() throws SQLException {
+        if (1 == metaData.getColumnCount()) {
+            this.currentRow = (T) queryResult.getValue(1, Object.class);
+        } else {
+            Object[] rowValues = new Object[metaData.getColumnCount()];
+            for (int i = 0; i < metaData.getColumnCount(); i++) {
+                rowValues[i] = queryResult.getValue(i + 1, Object.class);
+            }
+            this.currentRow = (T) rowValues;
+        }
+    }
+    
     @Override
     public void reset() {
     }
@@ -73,7 +87,6 @@ public final class SQLFederationRowEnumerator<T> implements Enumerator<T> {
                 each.close();
             }
             currentRow = null;
-            iterator = rows.iterator();
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         }