You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/03 10:58:08 UTC
[shardingsphere] branch master updated: init CalciteJDBCExecutor
(#8865)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2ceefd4 init CalciteJDBCExecutor (#8865)
2ceefd4 is described below
commit 2ceefd4c24492b3d069cb785ee2246d5db661523
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Sun Jan 3 18:57:42 2021 +0800
init CalciteJDBCExecutor (#8865)
* init CalciteJDBCExecutor
* remove useless member
---
.../infra/optimize/context/CalciteContext.java | 39 +++--------------
.../optimize/context/CalciteContextFactory.java | 50 ++++++++++++++++++++--
.../infra/optimize/context/CalciteDataContext.java | 2 +-
.../optimize/execute/CalciteJDBCExecutor.java | 28 ++++++------
.../infra/route/context/RouteContext.java | 5 ---
5 files changed, 67 insertions(+), 57 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java
index 8f85199..c47cd29 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java
@@ -18,58 +18,29 @@
package org.apache.shardingsphere.infra.optimize.context;
import lombok.Getter;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable.ViewExpander;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import lombok.RequiredArgsConstructor;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.calcite.sql2rel.SqlToRelConverter.Config;
-import org.apache.calcite.sql2rel.StandardConvertletTable;
import org.apache.shardingsphere.infra.optimize.schema.CalciteLogicSchema;
-import java.util.Collections;
+import java.util.Properties;
/**
* Calcite context.
*
*/
@Getter
+@RequiredArgsConstructor
public final class CalciteContext {
+
+ private final Properties connectionProperties;
private final CalciteLogicSchema calciteLogicSchema;
- private final CalciteCatalogReader catalogReader;
-
private final SqlParser.Config parserConfig;
private final SqlValidator validator;
private final SqlToRelConverter relConverter;
-
- public CalciteContext(final CalciteConnectionConfig config,
- final SqlParser.Config parserConfig, final RelDataTypeFactory typeFactory, final RelOptCluster cluster, final CalciteLogicSchema calciteLogicSchema) {
- this.calciteLogicSchema = calciteLogicSchema;
- CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
- rootSchema.add(calciteLogicSchema.getName(), calciteLogicSchema);
- catalogReader = new CalciteCatalogReader(rootSchema, Collections.singletonList(config.schema()), typeFactory, config);
- this.parserConfig = parserConfig;
- validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlValidator.Config.DEFAULT
- .withLenientOperatorLookup(config.lenientOperatorLookup())
- .withSqlConformance(config.conformance())
- .withDefaultNullCollation(config.defaultNullCollation())
- .withIdentifierExpansion(true));
- relConverter = createSqlToRelConverter(cluster);
- }
-
- private SqlToRelConverter createSqlToRelConverter(final RelOptCluster cluster) {
- Config config = SqlToRelConverter.config().withTrimUnusedFields(true);
- ViewExpander expander = (rowType, queryString, schemaPath, viewPath) -> null;
- return new SqlToRelConverter(expander, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, config);
- }
}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java
index 2b24ded..9b6b481 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java
@@ -19,24 +19,35 @@ package org.apache.shardingsphere.infra.optimize.context;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParser.Config;
import org.apache.calcite.sql.parser.impl.SqlParserImpl;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
import org.apache.shardingsphere.infra.optimize.plan.PlannerInitializer;
+import org.apache.shardingsphere.infra.optimize.schema.CalciteLogicSchema;
import org.apache.shardingsphere.infra.optimize.schema.CalciteLogicSchemaFactory;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
@@ -45,6 +56,8 @@ import java.util.Properties;
*
*/
public final class CalciteContextFactory {
+
+ private final Properties properties;
private final CalciteConnectionConfig connectionConfig;
@@ -57,7 +70,8 @@ public final class CalciteContextFactory {
private final RelOptCluster cluster;
public CalciteContextFactory(final Map<String, ShardingSphereMetaData> metaDataMap) {
- connectionConfig = new CalciteConnectionConfigImpl(createProperties());
+ properties = createProperties();
+ connectionConfig = new CalciteConnectionConfigImpl(properties);
parserConfig = SqlParser.config()
.withLex(connectionConfig.lex())
.withIdentifierMaxLength(SqlParser.DEFAULT_IDENTIFIER_MAX_LENGTH)
@@ -71,8 +85,8 @@ public final class CalciteContextFactory {
private Properties createProperties() {
// TODO Not only MySQL here.
Properties result = new Properties();
- result.setProperty("lex", Lex.MYSQL.name());
- result.setProperty("conformance", SqlConformanceEnum.MYSQL_5.name());
+ result.setProperty(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL.name());
+ result.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), SqlConformanceEnum.MYSQL_5.name());
return result;
}
@@ -91,9 +105,37 @@ public final class CalciteContextFactory {
*/
public CalciteContext create(final String schema, final CalciteInternalExecutor executor) {
try {
- return new CalciteContext(connectionConfig, parserConfig, typeFactory, cluster, factory.create(schema, executor));
+ return create(connectionConfig, parserConfig, typeFactory, cluster, factory.create(schema, executor));
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
}
+
+ private CalciteContext create(final CalciteConnectionConfig config,
+ final SqlParser.Config parserConfig, final RelDataTypeFactory typeFactory, final RelOptCluster cluster, final CalciteLogicSchema calciteLogicSchema) {
+ CalciteCatalogReader catalogReader = createCalciteCatalogReader(config, typeFactory, calciteLogicSchema);
+ SqlValidator validator = createSqlValidator(config, typeFactory, catalogReader);
+ SqlToRelConverter relConverter = createSqlToRelConverter(cluster, validator, catalogReader);
+ return new CalciteContext(properties, calciteLogicSchema, parserConfig, validator, relConverter);
+ }
+
+ private CalciteCatalogReader createCalciteCatalogReader(final CalciteConnectionConfig config, final RelDataTypeFactory typeFactory, final CalciteLogicSchema calciteLogicSchema) {
+ CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
+ rootSchema.add(calciteLogicSchema.getName(), calciteLogicSchema);
+ return new CalciteCatalogReader(rootSchema, Collections.singletonList(config.schema()), typeFactory, config);
+ }
+
+ private SqlValidator createSqlValidator(final CalciteConnectionConfig config, final RelDataTypeFactory typeFactory, final CalciteCatalogReader catalogReader) {
+ return SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlValidator.Config.DEFAULT
+ .withLenientOperatorLookup(config.lenientOperatorLookup())
+ .withSqlConformance(config.conformance())
+ .withDefaultNullCollation(config.defaultNullCollation())
+ .withIdentifierExpansion(true));
+ }
+
+ private SqlToRelConverter createSqlToRelConverter(final RelOptCluster cluster, final SqlValidator validator, final CalciteCatalogReader catalogReader) {
+ SqlToRelConverter.Config config = SqlToRelConverter.config().withTrimUnusedFields(true);
+ RelOptTable.ViewExpander expander = (rowType, queryString, schemaPath, viewPath) -> null;
+ return new SqlToRelConverter(expander, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, config);
+ }
}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java
index 0ad163d..e7839b2 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java
@@ -33,7 +33,7 @@ public final class CalciteDataContext implements DataContext {
@Override
public SchemaPlus getRootSchema() {
- return context.getCatalogReader().getRootSchema().plus();
+ return context.getValidator().getCatalogReader().getRootSchema().plus();
}
@Override
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
index 88bf5af..e81f3ff 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
@@ -17,13 +17,11 @@
package org.apache.shardingsphere.infra.optimize.execute;
-import lombok.RequiredArgsConstructor;
-import org.apache.calcite.config.Lex;
+import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.validate.SqlConformanceEnum;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import org.apache.shardingsphere.infra.optimize.context.CalciteContext;
import java.sql.Connection;
@@ -31,6 +29,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -38,7 +37,6 @@ import java.util.Properties;
/**
* Calcite JDBC executor.
*/
-@RequiredArgsConstructor
public final class CalciteJDBCExecutor {
public static final String CONNECTION_URL = "jdbc:calcite:";
@@ -55,21 +53,25 @@ public final class CalciteJDBCExecutor {
} catch (final ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
- PROPERTIES.setProperty("lex", Lex.MYSQL.name());
- PROPERTIES.setProperty("conformance", SqlConformanceEnum.MYSQL_5.name());
+ }
+
+ public CalciteJDBCExecutor(final CalciteContext context) {
+ this.context = context;
+ PROPERTIES.setProperty(CalciteConnectionProperty.LEX.camelName(), context.getConnectionProperties().getProperty(CalciteConnectionProperty.LEX.camelName()));
+ PROPERTIES.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), context.getConnectionProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
}
/**
* Execute query.
*
- * @param executionContext execution context
- * @param callback JDBC execute callback
- * @param <T> class type of return value
+ * @param sql sql
+ * @param parameters parameters
* @return execute result
* @throws SQLException SQL exception
*/
- public <T> List<T> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<T> callback) throws SQLException {
- return Collections.emptyList();
+ public Collection<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
+ QueryResult result = new JDBCStreamQueryResult(execute(sql, parameters));
+ return Collections.singletonList(result);
}
private ResultSet execute(final String sql, final List<Object> parameters) throws SQLException {
diff --git a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
index 0274489..7b05ad7 100644
--- a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.infra.route.context;
import lombok.Getter;
import lombok.Setter;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
@@ -50,10 +49,6 @@ public final class RouteContext {
@Setter
private boolean toCalcite;
- // TODO Define it as final and return sqlStatementContext in ExecutionContext
- @Setter
- private LogicSQL logicSQL;
-
/**
* Judge is route for single database and table only or not.
*