You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/17 01:22:14 UTC
[shardingsphere] branch master updated: Support sql federation stream query when not contains pipeline breaker (#26393)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d263b991c4b Support sql federation stream query when not contains pipeline breaker (#26393)
d263b991c4b is described below
commit d263b991c4bbedf9b8aaaeb1a10b032bdb16713f
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Sat Jun 17 09:22:08 2023 +0800
Support sql federation stream query when not contains pipeline breaker (#26393)
---
.../executor/TableScanExecutorContext.java | 7 +-
.../EnumerablePushDownTableScanExecutor.java | 88 ++++++++--------------
.../executor/row/SQLFederationRowEnumerator.java | 43 +++++++----
3 files changed, 65 insertions(+), 73 deletions(-)
diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
index 010195ff6dc..47df2132b41 100644
--- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
+++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
@@ -21,8 +21,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
/**
- * Translatable table scan executor context.
+ * Table scan executor context.
*/
@RequiredArgsConstructor
@Getter
@@ -35,4 +38,6 @@ public final class TableScanExecutorContext {
private final ConfigurationProperties props;
private final SQLFederationExecutorContext federationContext;
+
+ private final Map<String, Integer> connectionOffsets = new LinkedHashMap<>();
}
diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java
index dcb8e18baa7..8c74dec60a9 100644
--- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java
+++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerablePushDownTableScanExecutor.java
@@ -49,14 +49,13 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.hint.HintValueContext;
@@ -74,17 +73,17 @@ import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sqlfederation.compiler.context.OptimizerContext;
+import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationSchema;
+import org.apache.shardingsphere.sqlfederation.compiler.planner.util.SQLFederationPlannerUtils;
import org.apache.shardingsphere.sqlfederation.executor.SQLDialectFactory;
import org.apache.shardingsphere.sqlfederation.executor.SQLFederationDataContext;
import org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
import org.apache.shardingsphere.sqlfederation.executor.TableScanExecutorContext;
-import org.apache.shardingsphere.sqlfederation.executor.util.StringToRexNodeUtils;
import org.apache.shardingsphere.sqlfederation.executor.row.EmptyRowEnumerator;
import org.apache.shardingsphere.sqlfederation.executor.row.MemoryEnumerator;
import org.apache.shardingsphere.sqlfederation.executor.row.SQLFederationRowEnumerator;
-import org.apache.shardingsphere.sqlfederation.compiler.context.OptimizerContext;
-import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationSchema;
-import org.apache.shardingsphere.sqlfederation.compiler.planner.util.SQLFederationPlannerUtils;
+import org.apache.shardingsphere.sqlfederation.executor.util.StringToRexNodeUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -150,7 +149,7 @@ public final class EnumerablePushDownTableScanExecutor {
federationContext.getExecutionUnits().addAll(context.getExecutionUnits());
return createEmptyScalarEnumerable();
}
- return executeScalarEnumerable(databaseType, queryContext, database, context);
+ return executeScalarEnumerable(queryContext, database, context);
}
private AbstractEnumerable<Object> createEmptyScalarEnumerable() {
@@ -179,18 +178,18 @@ public final class EnumerablePushDownTableScanExecutor {
};
}
- private AbstractEnumerable<Object> executeScalarEnumerable(final DatabaseType databaseType, final QueryContext queryContext,
- final ShardingSphereDatabase database, final ExecutionContext context) {
+ private AbstractEnumerable<Object> executeScalarEnumerable(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
try {
+ computeConnectionOffsets(context);
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
- prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
+ prepareEngine.prepare(context.getRouteContext(), executorContext.getConnectionOffsets(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
setParameters(executionGroupContext.getInputGroups());
processEngine.executeSQL(executionGroupContext, context.getQueryContext());
- List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
+ List<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups());
- return createScalarEnumerable(mergedResult, statements);
+ return createScalarEnumerable(mergedResult, queryResults.get(0).getMetaData(), statements);
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
@@ -198,27 +197,27 @@ public final class EnumerablePushDownTableScanExecutor {
}
}
- private AbstractEnumerable<Object> createScalarEnumerable(final MergedResult mergedResult, final Collection<Statement> statements) throws SQLException {
- // TODO remove getRows when mergedResult support JDBC first method
- Collection<Object> rows = getScalarRows(mergedResult);
+ private void computeConnectionOffsets(final ExecutionContext context) {
+ for (ExecutionUnit each : context.getExecutionUnits()) {
+ if (executorContext.getConnectionOffsets().containsKey(each.getDataSourceName())) {
+ int connectionOffset = executorContext.getConnectionOffsets().get(each.getDataSourceName());
+ executorContext.getConnectionOffsets().put(each.getDataSourceName(), ++connectionOffset);
+ } else {
+ executorContext.getConnectionOffsets().put(each.getDataSourceName(), 0);
+ }
+ }
+ }
+
+ private AbstractEnumerable<Object> createScalarEnumerable(final MergedResult mergedResult, final QueryResultMetaData metaData, final Collection<Statement> statements) {
return new AbstractEnumerable<Object>() {
@Override
public Enumerator<Object> enumerator() {
- return new SQLFederationRowEnumerator<>(rows, statements);
+ return new SQLFederationRowEnumerator<>(mergedResult, metaData, statements);
}
};
}
- private Collection<Object> getScalarRows(final MergedResult mergedResult) throws SQLException {
- Collection<Object> result = new LinkedList<>();
- while (mergedResult.next()) {
- Object currentRow = mergedResult.getValue(1, Object.class);
- result.add(currentRow);
- }
- return result;
- }
-
/**
* Execute.
*
@@ -242,16 +241,17 @@ public final class EnumerablePushDownTableScanExecutor {
federationContext.getExecutionUnits().addAll(context.getExecutionUnits());
return createEmptyEnumerable();
}
- return execute(databaseType, queryContext, database, context);
+ return execute(queryContext, database, context);
}
- private AbstractEnumerable<Object[]> execute(final DatabaseType databaseType, final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
+ private AbstractEnumerable<Object[]> execute(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
try {
+ computeConnectionOffsets(context);
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
- prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
+ prepareEngine.prepare(context.getRouteContext(), executorContext.getConnectionOffsets(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName()));
setParameters(executionGroupContext.getInputGroups());
processEngine.executeSQL(executionGroupContext, context.getQueryContext());
- List<QueryResult> queryResults = execute(executionGroupContext, databaseType);
+ List<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups());
@@ -263,18 +263,6 @@ public final class EnumerablePushDownTableScanExecutor {
}
}
- private List<QueryResult> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final DatabaseType databaseType) throws SQLException {
- Collection<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- List<QueryResult> result = new LinkedList<>();
- for (QueryResult each : queryResults) {
- QueryResult queryResult = each instanceof JDBCStreamQueryResult
- ? new JDBCMemoryQueryResult(((JDBCStreamQueryResult) each).getResultSet(), databaseType)
- : each;
- result.add(queryResult);
- }
- return result;
- }
-
private Enumerable<Object[]> executeByShardingSphereData(final String databaseName, final String schemaName, final ShardingSphereTable table) {
Optional<ShardingSphereTableData> tableData = Optional.ofNullable(data.getDatabaseData().get(databaseName)).map(optional -> optional.getSchemaData().get(schemaName))
.map(ShardingSphereSchemaData::getTableData).map(shardingSphereData -> shardingSphereData.get(table.getName()));
@@ -378,30 +366,16 @@ public final class EnumerablePushDownTableScanExecutor {
return result;
}
- private AbstractEnumerable<Object[]> createEnumerable(final MergedResult mergedResult, final QueryResultMetaData metaData, final Collection<Statement> statements) throws SQLException {
- // TODO remove getRows when mergedResult support JDBC first method
- Collection<Object[]> rows = getRows(mergedResult, metaData);
+ private AbstractEnumerable<Object[]> createEnumerable(final MergedResult mergedResult, final QueryResultMetaData metaData, final Collection<Statement> statements) {
return new AbstractEnumerable<Object[]>() {
@Override
public Enumerator<Object[]> enumerator() {
- return new SQLFederationRowEnumerator<>(rows, statements);
+ return new SQLFederationRowEnumerator<>(mergedResult, metaData, statements);
}
};
}
- private Collection<Object[]> getRows(final MergedResult mergedResult, final QueryResultMetaData metaData) throws SQLException {
- Collection<Object[]> result = new LinkedList<>();
- while (mergedResult.next()) {
- Object[] currentRow = new Object[metaData.getColumnCount()];
- for (int i = 0; i < metaData.getColumnCount(); i++) {
- currentRow[i] = mergedResult.getValue(i + 1, Object.class);
- }
- result.add(currentRow);
- }
- return result;
- }
-
private QueryContext createQueryContext(final ShardingSphereMetaData metaData, final SqlString sqlString, final DatabaseType databaseType, final boolean useCache) {
String sql = sqlString.getSql().replace("\n", " ");
SQLStatement sqlStatement = new SQLStatementParserEngine(databaseType.getType(),
diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
index f7c36792573..55dc3f101ed 100644
--- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
+++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
@@ -17,51 +17,65 @@
package org.apache.shardingsphere.sqlfederation.executor.row;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
import org.apache.calcite.linq4j.Enumerator;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
-import java.util.Iterator;
/**
* SQL federation row enumerator.
*
* @param <T> type of row
*/
+@RequiredArgsConstructor
public final class SQLFederationRowEnumerator<T> implements Enumerator<T> {
- private final Collection<T> rows;
+ private final MergedResult queryResult;
- private final Collection<Statement> statements;
+ private final QueryResultMetaData metaData;
- private Iterator<T> iterator;
+ private final Collection<Statement> statements;
private T currentRow;
- public SQLFederationRowEnumerator(final Collection<T> rows, final Collection<Statement> statements) {
- this.rows = rows;
- this.statements = statements;
- iterator = rows.iterator();
- }
-
@Override
public T current() {
return currentRow;
}
+ @SneakyThrows
@Override
public boolean moveNext() {
- if (iterator.hasNext()) {
- currentRow = iterator.next();
+ return moveNext0();
+ }
+
+ private boolean moveNext0() throws SQLException {
+ if (queryResult.next()) {
+ setCurrentRow();
return true;
}
- currentRow = null;
- iterator = rows.iterator();
return false;
}
+ @SuppressWarnings("unchecked")
+ private void setCurrentRow() throws SQLException {
+ if (1 == metaData.getColumnCount()) {
+ this.currentRow = (T) queryResult.getValue(1, Object.class);
+ } else {
+ Object[] rowValues = new Object[metaData.getColumnCount()];
+ for (int i = 0; i < metaData.getColumnCount(); i++) {
+ rowValues[i] = queryResult.getValue(i + 1, Object.class);
+ }
+ this.currentRow = (T) rowValues;
+ }
+ }
+
@Override
public void reset() {
}
@@ -73,7 +87,6 @@ public final class SQLFederationRowEnumerator<T> implements Enumerator<T> {
each.close();
}
currentRow = null;
- iterator = rows.iterator();
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
}