You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2020/12/17 07:40:12 UTC

[shardingsphere] branch master updated: Create CalciteSchemaFactory and CalciteSchema (#8667)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new adaded0  Create CalciteSchemaFactory and CalciteSchema (#8667)
adaded0 is described below

commit adaded0f429abcb8efdc22164e1c28a00a4f7f65
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Thu Dec 17 15:39:48 2020 +0800

    Create CalciteSchemaFactory and CalciteSchema (#8667)
---
 .../shardingsphere-infra-optimize/pom.xml          |   5 +
 .../optimize/schema/AbstractCalciteTable.java      |  87 +++++++++++++++++
 .../optimize/schema/CalciteFilterableTable.java    |  85 +++++++++++++++++
 .../optimize/schema/CalciteRowEnumerator.java      | 105 +++++++++++++++++++++
 .../infra/optimize/schema/CalciteSchema.java       |  64 +++++++++++++
 .../optimize/schema/CalciteSchemaFactory.java      |  85 +++++++++++++++++
 6 files changed, 431 insertions(+)

diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/pom.xml b/shardingsphere-infra/shardingsphere-infra-optimize/pom.xml
index 216f7a6..5d42ee3 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/pom.xml
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/pom.xml
@@ -30,6 +30,11 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-infra-context</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
         </dependency>
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/AbstractCalciteTable.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/AbstractCalciteTable.java
new file mode 100644
index 0000000..49949ef
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/AbstractCalciteTable.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.optimize.schema;
+
+import lombok.Getter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.metadata.schema.builder.loader.TableMetaDataLoader;
+import org.apache.shardingsphere.infra.metadata.schema.model.ColumnMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Abstract calcite table.
+ */
+@Getter
+public abstract class AbstractCalciteTable extends AbstractTable {
+    
+    private final Map<String, DataSource> dataSources = new LinkedMap<>();
+    
+    private final Collection<DataNode> dataNodes = new LinkedList<>();
+    
+    private final TableMetaData tableMetaData;
+    
+    private final RelProtoDataType relProtoDataType;
+    
+    public AbstractCalciteTable(final Map<String, DataSource> dataSources, final Collection<DataNode> dataNodes, final DatabaseType databaseType) throws SQLException {
+        this.dataSources.putAll(dataSources);
+        this.dataNodes.addAll(dataNodes);
+        tableMetaData = createTableMetaData(dataSources, dataNodes, databaseType);
+        relProtoDataType = getRelDataType();
+    }
+    
+    private TableMetaData createTableMetaData(final Map<String, DataSource> dataSources, final Collection<DataNode> dataNodes, final DatabaseType databaseType) throws SQLException {
+        DataNode dataNode = dataNodes.iterator().next();
+        Optional<TableMetaData> tableMetaData = TableMetaDataLoader.load(dataSources.get(dataNode.getDataSourceName()), dataNode.getTableName(), databaseType);
+        if (!tableMetaData.isPresent()) {
+            throw new RuntimeException("No table metaData.");
+        }
+        return tableMetaData.get();
+    }
+    
+    private RelProtoDataType getRelDataType() {
+        RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+        RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
+        for (Map.Entry<String, ColumnMetaData> entry : tableMetaData.getColumns().entrySet()) {
+            SqlTypeName sqlTypeName = SqlTypeName.getNameForJdbcType(entry.getValue().getDataType());
+            fieldInfo.add(entry.getKey(), null == sqlTypeName ? typeFactory.createUnknownType() : typeFactory.createTypeWithNullability(typeFactory.createSqlType(sqlTypeName), true));
+        }
+        return RelDataTypeImpl.proto(fieldInfo.build());
+    }
+    
+    @Override
+    public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+        return relProtoDataType.apply(typeFactory);
+    }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteFilterableTable.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteFilterableTable.java
new file mode 100644
index 0000000..a5c1384
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteFilterableTable.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.optimize.schema;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ProjectableFilterableTable;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+
+import javax.sql.DataSource;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Calcite filterable Table.
+ *
+ */
+public final class CalciteFilterableTable extends AbstractCalciteTable implements ProjectableFilterableTable {
+    
+    private final Map<DataNode, String> tableQuerySQLs = new LinkedMap<>();
+    
+    public CalciteFilterableTable(final Map<String, DataSource> dataSources, final Collection<DataNode> dataNodes, final DatabaseType databaseType) throws SQLException {
+        super(dataSources, dataNodes, databaseType);
+        String columns = Joiner.on(",").join(getTableMetaData().getColumns().keySet());
+        for (DataNode each : dataNodes) {
+            tableQuerySQLs.put(each, String.format("SELECT %s FROM %s", columns, each.getTableName()));
+        }
+    }
+    
+    @Override
+    public Enumerable<Object[]> scan(final DataContext root, final List<RexNode> filters, final int[] projects) {
+        // TODO : use projects and filters
+        return new AbstractEnumerable<Object[]>() {
+
+            @Override
+            public Enumerator<Object[]> enumerator() {
+                return new CalciteRowEnumerator(getResultSets());
+            }
+        };
+    }
+    
+    private Collection<ResultSet> getResultSets() {
+        Collection<ResultSet> resultSets = new LinkedList<>();
+        for (Entry<DataNode, String> entry : tableQuerySQLs.entrySet()) {
+            resultSets.add(getResultSet(entry));
+        }
+        return resultSets;
+    }
+    
+    private ResultSet getResultSet(final Entry<DataNode, String> tableSQL) {
+        try {
+            Statement statement = getDataSources().get(tableSQL.getKey().getDataSourceName()).getConnection().createStatement();
+            return statement.executeQuery(tableSQL.getValue());
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteRowEnumerator.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteRowEnumerator.java
new file mode 100644
index 0000000..2609ed3
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteRowEnumerator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.optimize.schema;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * Calcite row enumerator.
+ *
+ */
+public final class CalciteRowEnumerator implements Enumerator<Object[]> {
+    
+    private final Collection<ResultSet> resultSets = new LinkedList<>();
+    
+    private final Iterator<ResultSet> iterator;
+    
+    private ResultSet currentResultSet;
+    
+    private Object[] currentRow;
+    
+    public CalciteRowEnumerator(final Collection<ResultSet> resultSets) {
+        this.resultSets.addAll(resultSets);
+        iterator = this.resultSets.iterator();
+        currentResultSet = iterator.next();
+    }
+    
+    @Override
+    public Object[] current() {
+        return currentRow;
+    }
+    
+    @Override
+    public boolean moveNext() {
+        try {
+            return moveNext0();
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    private boolean moveNext0() throws SQLException {
+        if (currentResultSet.next()) {
+            setCurrentRow();
+            return true;
+        }
+        if (!iterator.hasNext()) {
+            currentRow = null;
+            return false;
+        }
+        currentResultSet = iterator.next();
+        if (currentResultSet.next()) {
+            setCurrentRow();
+            return true;
+        }
+        return false;
+    }
+    
+    private void setCurrentRow() throws SQLException {
+        int columnCount = currentResultSet.getMetaData().getColumnCount();
+        currentRow = new Object[columnCount];
+        for (int i = 0; i < columnCount; i++) {
+            currentRow[i] = currentResultSet.getObject(i + 1);
+        }
+    }
+    
+    @Override
+    public void reset() {
+    }
+    
+    @Override
+    public void close() {
+        try {
+            for (ResultSet each : resultSets) {
+                each.getStatement().getConnection().close();
+                each.getStatement().close();
+                each.close();
+            }
+            currentRow = null;
+        } catch (final SQLException ex) {
+            throw new ShardingSphereException(ex);
+        }
+    }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteSchema.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteSchema.java
new file mode 100644
index 0000000..8e36cce
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteSchema.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.optimize.schema;
+
+import lombok.Getter;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+/**
+ * Calcite schema.
+ *
+ */
+@Getter
+public final class CalciteSchema extends AbstractSchema {
+    
+    private final Map<String, Table> tables = new LinkedMap<>();
+
+    public CalciteSchema(final Map<String, DataSource> dataSources,
+                         final Map<String, Collection<DataNode>> dataNodes, final DatabaseType databaseType) throws SQLException {
+        for (Entry<String, Collection<DataNode>> entry : dataNodes.entrySet()) {
+            tables.put(entry.getKey(), createTable(dataSources, entry.getValue(), databaseType));
+        }
+    }
+    
+    private Table createTable(final Map<String, DataSource> dataSources, final Collection<DataNode> dataNodes, final DatabaseType databaseType) throws SQLException {
+        Map<String, DataSource> tableDataSources = new LinkedMap<>();
+        for (DataNode each : dataNodes) {
+            if (dataSources.containsKey(each.getDataSourceName())) {
+                tableDataSources.put(each.getDataSourceName(), dataSources.get(each.getDataSourceName()));
+            }
+        }
+        return new CalciteFilterableTable(tableDataSources, dataNodes, databaseType);
+    }
+    
+    @Override
+    protected Map<String, Table> getTableMap() {
+        return tables;
+    }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteSchemaFactory.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteSchemaFactory.java
new file mode 100644
index 0000000..0c04782
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/schema/CalciteSchemaFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.optimize.schema;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+/**
+ * Calcite schema factory.
+ *
+ */
+public final class CalciteSchemaFactory {
+    
+    private final Map<String, Schema> schemas = new LinkedMap<>();
+    
+    public CalciteSchemaFactory(final Map<String, ShardingSphereMetaData> metaDataMap) throws SQLException {
+        for (Entry<String, ShardingSphereMetaData> each : metaDataMap.entrySet()) {
+            schemas.put(each.getKey(), createCalciteSchema(each.getValue()));
+        }
+    }
+    
+    private CalciteSchema createCalciteSchema(final ShardingSphereMetaData metaData) throws SQLException {
+        Collection<DataNodeContainedRule> dataNodeRules = getDataNodeRules(metaData);
+        return new CalciteSchema(metaData.getResource().getDataSources(), getDataNodes(dataNodeRules), metaData.getResource().getDatabaseType());
+    }
+    
+    private Collection<DataNodeContainedRule> getDataNodeRules(final ShardingSphereMetaData metaData) {
+        Collection<DataNodeContainedRule> result = new LinkedList<>();
+        for (ShardingSphereRule each : metaData.getRuleMetaData().getRules()) {
+            if (each instanceof DataNodeContainedRule) {
+                result.add((DataNodeContainedRule) each);
+            }
+        }
+        return result;
+    }
+    
+    private Map<String, Collection<DataNode>> getDataNodes(final Collection<DataNodeContainedRule> dataNodeRules) {
+        Map<String, Collection<DataNode>> result = new LinkedHashMap<>();
+        for (DataNodeContainedRule each : dataNodeRules) {
+            result.putAll(each.getAllDataNodes());
+        }
+        return result;
+    }
+    
+    /**
+     * Create schema.
+     *
+     * @param name name
+     * @return schema
+     */
+    public Schema create(final String name) {
+        if (!schemas.containsKey(name)) {
+            throw new ShardingSphereException("No `%s` schema.", name);
+        }
+        return schemas.get(name);
+    }
+}