You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/03 10:58:08 UTC

[shardingsphere] branch master updated: init CalciteJDBCExecutor (#8865)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ceefd4  init CalciteJDBCExecutor (#8865)
2ceefd4 is described below

commit 2ceefd4c24492b3d069cb785ee2246d5db661523
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Sun Jan 3 18:57:42 2021 +0800

    init CalciteJDBCExecutor (#8865)
    
    * init CalciteJDBCExecutor
    
    * remove useless member
---
 .../infra/optimize/context/CalciteContext.java     | 39 +++--------------
 .../optimize/context/CalciteContextFactory.java    | 50 ++++++++++++++++++++--
 .../infra/optimize/context/CalciteDataContext.java |  2 +-
 .../optimize/execute/CalciteJDBCExecutor.java      | 28 ++++++------
 .../infra/route/context/RouteContext.java          |  5 ---
 5 files changed, 67 insertions(+), 57 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java
index 8f85199..c47cd29 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContext.java
@@ -18,58 +18,29 @@
 package org.apache.shardingsphere.infra.optimize.context;
 
 import lombok.Getter;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable.ViewExpander;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import lombok.RequiredArgsConstructor;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.calcite.sql2rel.SqlToRelConverter.Config;
-import org.apache.calcite.sql2rel.StandardConvertletTable;
 import org.apache.shardingsphere.infra.optimize.schema.CalciteLogicSchema;
 
-import java.util.Collections;
+import java.util.Properties;
 
 /**
  * Calcite context.
  *
  */
 @Getter
+@RequiredArgsConstructor
 public final class CalciteContext {
+
+    private final Properties connectionProperties;
     
     private final CalciteLogicSchema calciteLogicSchema;
     
-    private final CalciteCatalogReader catalogReader;
-    
     private final SqlParser.Config parserConfig;
     
     private final SqlValidator validator;
     
     private final SqlToRelConverter relConverter;
-    
-    public CalciteContext(final CalciteConnectionConfig config,
-                          final SqlParser.Config parserConfig, final RelDataTypeFactory typeFactory, final RelOptCluster cluster, final CalciteLogicSchema calciteLogicSchema) {
-        this.calciteLogicSchema = calciteLogicSchema;
-        CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
-        rootSchema.add(calciteLogicSchema.getName(), calciteLogicSchema);
-        catalogReader = new CalciteCatalogReader(rootSchema, Collections.singletonList(config.schema()), typeFactory, config);
-        this.parserConfig = parserConfig;
-        validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlValidator.Config.DEFAULT
-                .withLenientOperatorLookup(config.lenientOperatorLookup())
-                .withSqlConformance(config.conformance())
-                .withDefaultNullCollation(config.defaultNullCollation())
-                .withIdentifierExpansion(true));
-        relConverter = createSqlToRelConverter(cluster);
-    }
-    
-    private SqlToRelConverter createSqlToRelConverter(final RelOptCluster cluster) {
-        Config config = SqlToRelConverter.config().withTrimUnusedFields(true);
-        ViewExpander expander = (rowType, queryString, schemaPath, viewPath) -> null;
-        return new SqlToRelConverter(expander, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, config);
-    }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java
index 2b24ded..9b6b481 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteContextFactory.java
@@ -19,24 +19,35 @@ package org.apache.shardingsphere.infra.optimize.context;
 
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.parser.SqlParser.Config;
 import org.apache.calcite.sql.parser.impl.SqlParserImpl;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
 import org.apache.shardingsphere.infra.optimize.plan.PlannerInitializer;
+import org.apache.shardingsphere.infra.optimize.schema.CalciteLogicSchema;
 import org.apache.shardingsphere.infra.optimize.schema.CalciteLogicSchemaFactory;
 
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 
@@ -45,6 +56,8 @@ import java.util.Properties;
  *
  */
 public final class CalciteContextFactory {
+
+    private final Properties properties;
     
     private final CalciteConnectionConfig connectionConfig;
     
@@ -57,7 +70,8 @@ public final class CalciteContextFactory {
     private final RelOptCluster cluster;
     
     public CalciteContextFactory(final Map<String, ShardingSphereMetaData> metaDataMap) {
-        connectionConfig = new CalciteConnectionConfigImpl(createProperties());
+        properties = createProperties();
+        connectionConfig = new CalciteConnectionConfigImpl(properties);
         parserConfig = SqlParser.config()
                 .withLex(connectionConfig.lex())
                 .withIdentifierMaxLength(SqlParser.DEFAULT_IDENTIFIER_MAX_LENGTH)
@@ -71,8 +85,8 @@ public final class CalciteContextFactory {
     private Properties createProperties() {
         // TODO Not only MySQL here.
         Properties result = new Properties();
-        result.setProperty("lex", Lex.MYSQL.name());
-        result.setProperty("conformance", SqlConformanceEnum.MYSQL_5.name());
+        result.setProperty(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL.name());
+        result.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), SqlConformanceEnum.MYSQL_5.name());
         return result;
     }
     
@@ -91,9 +105,37 @@ public final class CalciteContextFactory {
      */
     public CalciteContext create(final String schema, final CalciteInternalExecutor executor) {
         try {
-            return new CalciteContext(connectionConfig, parserConfig, typeFactory, cluster, factory.create(schema, executor));
+            return create(connectionConfig, parserConfig, typeFactory, cluster, factory.create(schema, executor));
         } catch (final SQLException ex) {
             throw new ShardingSphereException(ex);
         }
     }
+
+    private CalciteContext create(final CalciteConnectionConfig config,
+                                  final SqlParser.Config parserConfig, final RelDataTypeFactory typeFactory, final RelOptCluster cluster, final CalciteLogicSchema calciteLogicSchema) {
+        CalciteCatalogReader catalogReader = createCalciteCatalogReader(config, typeFactory, calciteLogicSchema);
+        SqlValidator validator = createSqlValidator(config, typeFactory, catalogReader);
+        SqlToRelConverter relConverter = createSqlToRelConverter(cluster, validator, catalogReader);
+        return new CalciteContext(properties, calciteLogicSchema, parserConfig, validator, relConverter);
+    }
+
+    private CalciteCatalogReader createCalciteCatalogReader(final CalciteConnectionConfig config, final RelDataTypeFactory typeFactory, final CalciteLogicSchema calciteLogicSchema) {
+        CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
+        rootSchema.add(calciteLogicSchema.getName(), calciteLogicSchema);
+        return new CalciteCatalogReader(rootSchema, Collections.singletonList(config.schema()), typeFactory, config);
+    }
+
+    private SqlValidator createSqlValidator(final CalciteConnectionConfig config, final RelDataTypeFactory typeFactory, final CalciteCatalogReader catalogReader) {
+        return SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlValidator.Config.DEFAULT
+                .withLenientOperatorLookup(config.lenientOperatorLookup())
+                .withSqlConformance(config.conformance())
+                .withDefaultNullCollation(config.defaultNullCollation())
+                .withIdentifierExpansion(true));
+    }
+
+    private SqlToRelConverter createSqlToRelConverter(final RelOptCluster cluster, final SqlValidator validator, final CalciteCatalogReader catalogReader) {
+        SqlToRelConverter.Config config = SqlToRelConverter.config().withTrimUnusedFields(true);
+        RelOptTable.ViewExpander expander = (rowType, queryString, schemaPath, viewPath) -> null;
+        return new SqlToRelConverter(expander, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, config);
+    }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java
index 0ad163d..e7839b2 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/CalciteDataContext.java
@@ -33,7 +33,7 @@ public final class CalciteDataContext implements DataContext {
     
     @Override
     public SchemaPlus getRootSchema() {
-        return context.getCatalogReader().getRootSchema().plus();
+        return context.getValidator().getCatalogReader().getRootSchema().plus();
     }
     
     @Override
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
index 88bf5af..e81f3ff 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
@@ -17,13 +17,11 @@
 
 package org.apache.shardingsphere.infra.optimize.execute;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.calcite.config.Lex;
+import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.validate.SqlConformanceEnum;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import org.apache.shardingsphere.infra.optimize.context.CalciteContext;
 
 import java.sql.Connection;
@@ -31,6 +29,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -38,7 +37,6 @@ import java.util.Properties;
 /**
  * Calcite JDBC executor.
  */
-@RequiredArgsConstructor
 public final class CalciteJDBCExecutor {
     
     public static final String CONNECTION_URL = "jdbc:calcite:";
@@ -55,21 +53,25 @@ public final class CalciteJDBCExecutor {
         } catch (final ClassNotFoundException ex) {
             throw new RuntimeException(ex);
         }
-        PROPERTIES.setProperty("lex", Lex.MYSQL.name());
-        PROPERTIES.setProperty("conformance", SqlConformanceEnum.MYSQL_5.name());
+    }
+
+    public CalciteJDBCExecutor(final CalciteContext context) {
+        this.context = context;
+        PROPERTIES.setProperty(CalciteConnectionProperty.LEX.camelName(), context.getConnectionProperties().getProperty(CalciteConnectionProperty.LEX.camelName()));
+        PROPERTIES.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), context.getConnectionProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
     }
     
     /**
      * Execute query.
      *
-     * @param executionContext execution context
-     * @param callback JDBC execute callback
-     * @param <T> class type of return value
+     * @param sql sql
+     * @param parameters parameters
      * @return execute result
      * @throws SQLException SQL exception
      */
-    public <T> List<T> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<T> callback) throws SQLException {
-        return Collections.emptyList();
+    public Collection<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
+        QueryResult result = new JDBCStreamQueryResult(execute(sql, parameters));
+        return Collections.singletonList(result);
     }
     
     private ResultSet execute(final String sql, final List<Object> parameters) throws SQLException {
diff --git a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
index 0274489..7b05ad7 100644
--- a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.infra.route.context;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 
@@ -50,10 +49,6 @@ public final class RouteContext {
     @Setter
     private boolean toCalcite;
     
-    // TODO Define it as final and return sqlStatementContext in ExecutionContext
-    @Setter
-    private LogicSQL logicSQL;
-    
     /**
      * Judge is route for single database and table only or not.
      *