You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2021/11/16 14:33:32 UTC
[ignite-3] branch main updated: IGNITE-15847 Refactoring of TableDescriptor for SQL Extension API (#448)
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3336da1 IGNITE-15847 Refactoring of TableDescriptor for SQL Extension API (#448)
3336da1 is described below
commit 3336da109f036627955450b04eada71ac01039b7
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Tue Nov 16 17:33:28 2021 +0300
IGNITE-15847 Refactoring of TableDescriptor for SQL Extension API (#448)
---
.../query/calcite/exec/LogicalRelImplementor.java | 11 +-
.../query/calcite/exec/rel/ModifyNode.java | 18 +--
.../query/calcite/exec/rel/TableScanNode.java | 22 +--
.../query/calcite/schema/IgniteTable.java | 20 +--
.../query/calcite/schema/IgniteTableImpl.java | 142 +++++++++++++++++++-
.../query/calcite/schema/InternalIgniteTable.java | 41 ++++++
.../query/calcite/schema/SchemaHolderImpl.java | 4 +-
.../query/calcite/schema/TableDescriptor.java | 54 ++------
.../query/calcite/schema/TableDescriptorImpl.java | 147 +--------------------
.../query/calcite/planner/AbstractPlannerTest.java | 53 ++++----
10 files changed, 264 insertions(+), 248 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 3c0ed77..3293cb3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -94,8 +94,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceS
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgniteTable;
import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -285,7 +284,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
// RexNode condition = rel.condition();
// List<RexNode> projects = rel.projects();
- IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
+ InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
ImmutableBitSet requiredColumns = rel.requiredColumns();
@@ -316,7 +315,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
List<RexNode> projects = rel.projects();
ImmutableBitSet requiredColumns = rel.requiredColumns();
- IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
+ InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
@@ -329,7 +328,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
return new TableScanNode<>(
ctx,
rowType,
- tbl.descriptor(),
+ tbl,
group.partitions(ctx.planningContext().localNodeId()),
filters,
prj,
@@ -493,7 +492,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
case INSERT:
case UPDATE:
case DELETE:
- ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), rel.getTable().unwrap(TableDescriptor.class),
+ ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), rel.getTable().unwrap(InternalIgniteTable.class),
rel.getOperation(), rel.getUpdateColumnList());
Node<RowT> input = visit(rel.getInput());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 851b0da..93443dc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgniteTable;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.table.RecordView;
@@ -38,7 +38,7 @@ import org.apache.ignite.table.Tuple;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<RowT>, Downstream<RowT> {
- protected final TableDescriptor desc;
+ private final InternalIgniteTable table;
private final TableModify.Operation op;
@@ -64,24 +64,24 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
*
* @param ctx Execution context.
* @param rowType Rel data type.
- * @param desc Table descriptor.
+ * @param table Table object.
* @param op Operation/
* @param cols Update column list.
*/
public ModifyNode(
ExecutionContext<RowT> ctx,
RelDataType rowType,
- TableDescriptor desc,
+ InternalIgniteTable table,
TableModify.Operation op,
List<String> cols
) {
super(ctx, rowType);
- this.desc = desc;
+ this.table = table;
this.op = op;
this.cols = cols;
- tableView = desc.table().recordView();
+ tableView = table.table().recordView();
}
/** {@inheritDoc} */
@@ -114,7 +114,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
case DELETE:
case UPDATE:
case INSERT:
- tuples.add(desc.toTuple(context(), row, op, cols));
+ tuples.add(table.toTuple(context(), row, op, cols));
flushTuples(false);
@@ -201,13 +201,13 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
IgniteTypeFactory typeFactory = context().getTypeFactory();
RowHandler.RowFactory<RowT> rowFactory = context().rowHandler().factory(
context().getTypeFactory(),
- desc.insertRowType(typeFactory)
+ table.descriptor().insertRowType(typeFactory)
);
throw new IgniteInternalException(
"Failed to INSERT some keys because they are already in cache. "
+ "[tuples=" + duplicates.stream()
- .map(tup -> desc.toRow(context(), tup, rowFactory, null))
+ .map(tup -> table.toRow(context(), tup, rowFactory, null))
.map(context().rowHandler()::toString)
.collect(Collectors.toList()) + ']'
);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java
index a6b7a85..ed105e1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgniteTable;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableRow;
@@ -43,9 +43,11 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
/** Special value to highlights that all row were received and we are not waiting any more. */
private static final int NOT_WAITING = -1;
- private final TableImpl table;
+ /** Table that provides access to underlying data. */
+ private final TableImpl physTable;
- private final TableDescriptor desc;
+ /** Table that is an object in SQL schema. */
+ private final InternalIgniteTable schemaTable;
private final RowHandler.RowFactory<RowT> factory;
@@ -75,7 +77,7 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
*
* @param ctx Execution context.
* @param rowType Output type of the current node.
- * @param desc Table descriptor this node should scan.
+ * @param schemaTable The table this node should scan.
* @param parts Partition numbers to scan.
* @param filters Optional filter to filter out rows.
* @param rowTransformer Optional projection function.
@@ -84,7 +86,7 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
public TableScanNode(
ExecutionContext<RowT> ctx,
RelDataType rowType,
- TableDescriptor desc,
+ InternalIgniteTable schemaTable,
int[] parts,
@Nullable Predicate<RowT> filters,
@Nullable Function<RowT, RowT> rowTransformer,
@@ -94,8 +96,8 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
assert !nullOrEmpty(parts);
- table = desc.table();
- this.desc = desc;
+ this.physTable = schemaTable.table();
+ this.schemaTable = schemaTable;
this.parts = parts;
this.filters = filters;
this.rowTransformer = rowTransformer;
@@ -210,7 +212,7 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
if (subscription != null) {
subscription.request(waiting);
} else if (curPartIdx < parts.length) {
- table.internalTable().scan(parts[curPartIdx++], null).subscribe(new SubscriberImpl());
+ physTable.internalTable().scan(parts[curPartIdx++], null).subscribe(new SubscriberImpl());
} else {
waiting = NOT_WAITING;
}
@@ -268,8 +270,8 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
}
private RowT convert(BinaryRow binRow) {
- final org.apache.ignite.internal.schema.row.Row wrapped = table.schemaView().resolve(binRow);
+ final org.apache.ignite.internal.schema.row.Row wrapped = physTable.schemaView().resolve(binRow);
- return desc.toRow(context(), TableRow.tuple(wrapped), factory, requiredColumns);
+ return schemaTable.toRow(context(), TableRow.tuple(wrapped), factory, requiredColumns);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index f5daf02..1924709 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -45,13 +45,13 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
* @return Table descriptor.
*/
TableDescriptor descriptor();
-
+
/** {@inheritDoc} */
@Override
default RelDataType getRowType(RelDataTypeFactory typeFactory) {
return getRowType(typeFactory, null);
}
-
+
/**
* Returns new type according {@code requiredColumns} param.
*
@@ -59,13 +59,13 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
* @param requiredColumns Used columns enumeration.
*/
RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet requiredColumns);
-
+
/** {@inheritDoc} */
@Override
default TableScan toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
return toRel(context.getCluster(), relOptTable);
}
-
+
/**
* Converts table into relational expression.
*
@@ -74,31 +74,31 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
* @return Table relational expression.
*/
TableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl);
-
+
/**
* Returns table distribution.
*
* @return Table distribution.
*/
IgniteDistribution distribution();
-
+
/** {@inheritDoc} */
default Schema.TableType getJdbcTableType() {
return Schema.TableType.TABLE;
}
-
+
/** {@inheritDoc} */
default boolean isRolledUp(String column) {
return false;
}
-
+
/** {@inheritDoc} */
default boolean rolledUpColumnValidInsideAgg(String column, SqlCall call,
@Nullable SqlNode parent,
@Nullable CalciteConnectionConfig config) {
return false;
}
-
+
/** {@inheritDoc} */
default Statistic getStatistic() {
return new Statistic() {
@@ -108,7 +108,7 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
}
};
}
-
+
/** {@inheritDoc} */
@Override
default <C> @Nullable C unwrap(Class<C> cls) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
index be66f59..321ff9e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
@@ -18,20 +18,27 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
@@ -39,6 +46,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite table implementation.
@@ -46,6 +56,8 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
public class IgniteTableImpl extends AbstractTable implements InternalIgniteTable {
private final TableDescriptor desc;
+ private final TableImpl table;
+
private final Statistic statistic;
private final Map<String, IgniteIndex> indexes = new ConcurrentHashMap<>();
@@ -55,8 +67,10 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
*
* @param desc Table descriptor.
*/
- public IgniteTableImpl(TableDescriptor desc) {
+ public IgniteTableImpl(TableDescriptor desc, TableImpl table) {
this.desc = desc;
+ this.table = table;
+
statistic = new StatisticsImpl();
}
@@ -81,6 +95,12 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
/** {@inheritDoc} */
@Override
+ public TableImpl table() {
+ return table;
+ }
+
+ /** {@inheritDoc} */
+ @Override
public IgniteLogicalTableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
RelTraitSet traitSet = cluster.traitSetOf(distribution())
.replace(RewindabilityTrait.REWINDABLE);
@@ -108,7 +128,7 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
/** {@inheritDoc} */
@Override
public ColocationGroup colocationGroup(PlanningContext ctx) {
- return desc.colocationGroup(ctx);
+ return partitionedGroup();
}
/** {@inheritDoc} */
@@ -145,6 +165,124 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
return super.unwrap(cls);
}
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> RowT toRow(
+ ExecutionContext<RowT> ectx,
+ Tuple row,
+ RowHandler.RowFactory<RowT> factory,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ RowHandler<RowT> handler = factory.handler();
+
+ assert handler == ectx.rowHandler();
+
+ RowT res = factory.create();
+
+ assert handler.columnCount(res) == (requiredColumns == null ? desc.columnsCount() : requiredColumns.cardinality());
+
+ if (requiredColumns == null) {
+ for (int i = 0; i < desc.columnsCount(); i++) {
+ ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+ handler.set(i, res, row.value(colDesc.fieldIndex()));
+ }
+ } else {
+ for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
+ ColumnDescriptor colDesc = desc.columnDescriptor(j);
+
+ handler.set(i, res, row.value(colDesc.fieldIndex()));
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> Tuple toTuple(
+ ExecutionContext<RowT> ectx,
+ RowT row,
+ TableModify.Operation op,
+ Object arg
+ ) {
+ switch (op) {
+ case INSERT:
+ return insertTuple(row, ectx);
+ case DELETE:
+ return deleteTuple(row, ectx);
+ case UPDATE:
+ return updateTuple(row, (List<String>) arg, ectx);
+ case MERGE:
+ throw new UnsupportedOperationException();
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ private <RowT> Tuple insertTuple(RowT row, ExecutionContext<RowT> ectx) {
+ Tuple tuple = Tuple.create(desc.columnsCount());
+
+ RowHandler<RowT> hnd = ectx.rowHandler();
+
+ for (int i = 0; i < desc.columnsCount(); i++) {
+ tuple.set(desc.columnDescriptor(i).name(), hnd.get(i, row));
+ }
+
+ return tuple;
+ }
+
+ private <RowT> Tuple updateTuple(RowT row, List<String> updateColList, ExecutionContext<RowT> ectx) {
+ RowHandler<RowT> hnd = ectx.rowHandler();
+ int offset = desc.columnsCount();
+ Tuple tuple = Tuple.create(desc.columnsCount());
+ Set<String> colsToSkip = new HashSet<>(updateColList);
+
+ for (int i = 0; i < desc.columnsCount(); i++) {
+ String colName = desc.columnDescriptor(i).name();
+
+ if (!colsToSkip.contains(colName)) {
+ tuple.set(colName, hnd.get(i, row));
+ }
+ }
+
+ for (int i = 0; i < updateColList.size(); i++) {
+ final ColumnDescriptor colDesc = Objects.requireNonNull(desc.columnDescriptor(updateColList.get(i)));
+
+ assert !colDesc.key();
+
+ Object fieldVal = hnd.get(i + offset, row);
+
+ tuple.set(colDesc.name(), fieldVal);
+ }
+
+ return tuple;
+ }
+
+ private <RowT> Tuple deleteTuple(RowT row, ExecutionContext<RowT> ectx) {
+ RowHandler<RowT> hnd = ectx.rowHandler();
+ Tuple tuple = Tuple.create();
+
+ int idx = 0;
+ for (int i = 0; i < desc.columnsCount(); i++) {
+ ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+ if (colDesc.key()) {
+ tuple.set(colDesc.name(), hnd.get(idx++, row));
+ }
+ }
+
+ return tuple;
+ }
+
+ private ColocationGroup partitionedGroup() {
+ List<List<String>> assignments = table.internalTable().assignments().stream()
+ .map(Collections::singletonList)
+ .collect(Collectors.toList());
+
+ return ColocationGroup.forAssignments(assignments);
+ }
+
private class StatisticsImpl implements Statistic {
/** {@inheritDoc} */
@Override
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
index 3145285..71c7cb1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
@@ -20,9 +20,16 @@ package org.apache.ignite.internal.processors.query.calcite.schema;
import java.util.Map;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite internal table.
@@ -38,6 +45,40 @@ public interface InternalIgniteTable extends IgniteTable {
*/
IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName);
+ /** Returns the internal table. */
+ TableImpl table();
+
+ /**
+ * Converts a tuple to relational node row.
+ *
+ * @param ectx Execution context.
+ * @param row Tuple to convert.
+ * @param requiredColumns Participating columns.
+ * @return Relational node row.
+ */
+ <RowT> RowT toRow(
+ ExecutionContext<RowT> ectx,
+ Tuple row,
+ RowHandler.RowFactory<RowT> factory,
+ @Nullable ImmutableBitSet requiredColumns
+ );
+
+ /**
+ * Converts a relational node row to internal tuple.
+ *
+ * @param ectx Execution context.
+ * @param row Relational node row.
+ * @param op Operation.
+ * @param arg Operation specific argument.
+ * @return Cache key-value tuple;
+ */
+ <RowT> Tuple toTuple(
+ ExecutionContext<RowT> ectx,
+ RowT row,
+ TableModify.Operation op,
+ @Nullable Object arg
+ );
+
/**
* Returns nodes mapping.
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 4887e7a..f65c628 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -90,9 +90,9 @@ public class SchemaHolderImpl implements SchemaHolder {
))
.collect(Collectors.toList());
- TableDescriptorImpl desc = new TableDescriptorImpl(table, colDescriptors);
+ TableDescriptorImpl desc = new TableDescriptorImpl(colDescriptors);
- schema.addTable(removeSchema(schemaName, table.tableName()), new IgniteTableImpl(desc));
+ schema.addTable(removeSchema(schemaName, table.tableName()), new IgniteTableImpl(desc, table));
rebuild();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
index 1c2889e..a7dde3a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
@@ -18,21 +18,13 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql2rel.InitializerExpressionFactory;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.table.Tuple;
-import org.jetbrains.annotations.Nullable;
/**
* TableDescriptor interface.
@@ -42,17 +34,6 @@ public interface TableDescriptor extends RelProtoDataType, InitializerExpression
/** Returns distribution of the table. */
IgniteDistribution distribution();
- /** Returns the table this description describes. */
- TableImpl table();
-
- /**
- * Returns nodes mapping.
- *
- * @param ctx Planning context.
- * @return Nodes mapping.
- */
- ColocationGroup colocationGroup(PlanningContext ctx);
-
/** {@inheritDoc} */
@Override
default RelDataType apply(RelDataTypeFactory factory) {
@@ -108,40 +89,23 @@ public interface TableDescriptor extends RelProtoDataType, InitializerExpression
boolean isUpdateAllowed(RelOptTable tbl, int colIdx);
/**
- * Converts a tuple to relational node row.
+ * Returns column descriptor for given field name.
*
- * @param ectx Execution context.
- * @param row Tuple to convert.
- * @param requiredColumns Participating columns.
- * @return Relational node row.
+ * @return Column descriptor
*/
- <RowT> RowT toRow(
- ExecutionContext<RowT> ectx,
- Tuple row,
- RowHandler.RowFactory<RowT> factory,
- @Nullable ImmutableBitSet requiredColumns
- );
+ ColumnDescriptor columnDescriptor(String fieldName);
/**
- * Converts a relational node row to internal tuple.
+ * Returns column descriptor for column of given index.
*
- * @param ectx Execution context.
- * @param row Relational node row.
- * @param op Operation.
- * @param arg Operation specific argument.
- * @return Cache key-value tuple;
+ * @return Column descriptor or null if there is no column with given index.
*/
- <RowT> Tuple toTuple(
- ExecutionContext<RowT> ectx,
- RowT row,
- TableModify.Operation op,
- @Nullable Object arg
- );
+ ColumnDescriptor columnDescriptor(int idx);
/**
- * Returns column descriptor for given field name.
+ * Returns count of columns in the table.
*
- * @return Column descriptor
+ * @return Actual count of columns.
*/
- ColumnDescriptor columnDescriptor(String fieldName);
+ int columnsCount();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 2952ef0..d48d54e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -17,16 +17,10 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
@@ -35,17 +29,10 @@ import org.apache.calcite.schema.ColumnStrategy;
import org.apache.calcite.sql2rel.InitializerContext;
import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
-import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.table.Tuple;
-import org.jetbrains.annotations.Nullable;
/**
* TableDescriptorImpl.
@@ -54,8 +41,6 @@ import org.jetbrains.annotations.Nullable;
public class TableDescriptorImpl extends NullInitializerExpressionFactory implements TableDescriptor {
private static final ColumnDescriptor[] DUMMY = new ColumnDescriptor[0];
- private final TableImpl table;
-
private final ColumnDescriptor[] descriptors;
private final Map<String, ColumnDescriptor> descriptorsMap;
@@ -69,7 +54,6 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public TableDescriptorImpl(
- TableImpl table,
List<ColumnDescriptor> columnDescriptors
) {
ImmutableBitSet.Builder keyFieldsBuilder = ImmutableBitSet.builder();
@@ -85,7 +69,6 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
this.descriptors = columnDescriptors.toArray(DUMMY);
this.descriptorsMap = descriptorsMap;
- this.table = table;
insertFields = ImmutableBitSet.range(columnDescriptors.size());
keyFields = keyFieldsBuilder.build();
@@ -111,67 +94,6 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
/** {@inheritDoc} */
@Override
- public TableImpl table() {
- return table;
- }
-
- /** {@inheritDoc} */
- @Override
- public <RowT> RowT toRow(
- ExecutionContext<RowT> ectx,
- Tuple row,
- RowHandler.RowFactory<RowT> factory,
- @Nullable ImmutableBitSet requiredColumns
- ) {
- RowHandler<RowT> handler = factory.handler();
-
- assert handler == ectx.rowHandler();
-
- RowT res = factory.create();
-
- assert handler.columnCount(res) == (requiredColumns == null ? descriptors.length : requiredColumns.cardinality());
-
- if (requiredColumns == null) {
- for (int i = 0; i < descriptors.length; i++) {
- ColumnDescriptor desc = descriptors[i];
-
- handler.set(i, res, row.value(desc.fieldIndex()));
- }
- } else {
- for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
- ColumnDescriptor desc = descriptors[j];
-
- handler.set(i, res, row.value(desc.fieldIndex()));
- }
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override
- public <RowT> Tuple toTuple(
- ExecutionContext<RowT> ectx,
- RowT row,
- TableModify.Operation op,
- Object arg
- ) {
- switch (op) {
- case INSERT:
- return insertTuple(row, ectx);
- case DELETE:
- return deleteTuple(row, ectx);
- case UPDATE:
- return updateTuple(row, (List<String>) arg, ectx);
- case MERGE:
- throw new UnsupportedOperationException();
- default:
- throw new AssertionError();
- }
- }
-
- /** {@inheritDoc} */
- @Override
public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
return !descriptors[colIdx].key();
}
@@ -227,70 +149,13 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
/** {@inheritDoc} */
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
- return partitionedGroup();
+ public ColumnDescriptor columnDescriptor(int idx) {
+ return idx < 0 || idx >= descriptors.length ? null : descriptors[idx];
}
- private ColocationGroup partitionedGroup() {
- List<List<String>> assignments = table.internalTable().assignments().stream()
- .map(Collections::singletonList)
- .collect(Collectors.toList());
-
- return ColocationGroup.forAssignments(assignments);
- }
-
- private <RowT> Tuple insertTuple(RowT row, ExecutionContext<RowT> ectx) {
- Tuple tuple = Tuple.create(descriptors.length);
-
- RowHandler<RowT> hnd = ectx.rowHandler();
-
- for (int i = 0; i < descriptors.length; i++) {
- tuple.set(descriptors[i].name(), hnd.get(i, row));
- }
-
- return tuple;
- }
-
- private <RowT> Tuple updateTuple(RowT row, List<String> updateColList, ExecutionContext<RowT> ectx) {
- RowHandler<RowT> hnd = ectx.rowHandler();
- int offset = descriptorsMap.size();
- Tuple tuple = Tuple.create(descriptors.length);
- Set<String> colsToSkip = new HashSet<>(updateColList);
-
- for (int i = 0; i < descriptors.length; i++) {
- String colName = descriptors[i].name();
-
- if (!colsToSkip.contains(colName)) {
- tuple.set(colName, hnd.get(i, row));
- }
- }
-
- for (int i = 0; i < updateColList.size(); i++) {
- final ColumnDescriptor desc = Objects.requireNonNull(descriptorsMap.get(updateColList.get(i)));
-
- assert !desc.key();
-
- Object fieldVal = hnd.get(i + offset, row);
-
- tuple.set(desc.name(), fieldVal);
- }
-
- return tuple;
- }
-
- private <RowT> Tuple deleteTuple(RowT row, ExecutionContext<RowT> ectx) {
- RowHandler<RowT> hnd = ectx.rowHandler();
- Tuple tuple = Tuple.create(keyFields.cardinality());
-
- int idx = 0;
- for (int i = 0; i < descriptors.length; i++) {
- ColumnDescriptor desc = descriptors[i];
-
- if (desc.key()) {
- tuple.set(desc.name(), hnd.get(idx++, row));
- }
- }
-
- return tuple;
+ /** {@inheritDoc} */
+ @Override
+ public int columnsCount() {
+ return descriptors.length;
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index e49b61c..52752cb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -56,7 +56,7 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelReferentialConstraint;
import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -74,7 +74,7 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql2rel.InitializerContext;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
@@ -725,6 +725,25 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
public String name() {
return name;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public TableImpl table() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> RowT toRow(ExecutionContext<RowT> ectx, Tuple row, RowFactory<RowT> factory,
+ @Nullable ImmutableBitSet requiredColumns) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> Tuple toTuple(ExecutionContext<RowT> ectx, RowT row, Operation op, @Nullable Object arg) {
+ throw new AssertionError();
+ }
}
/**
@@ -753,12 +772,6 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
/** {@inheritDoc} */
@Override
- public TableImpl table() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override
public RelDataType rowType(IgniteTypeFactory factory, ImmutableBitSet usedColumns) {
return rowType;
}
@@ -771,34 +784,28 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
/** {@inheritDoc} */
@Override
- public <RowT> RowT toRow(ExecutionContext<RowT> ectx, Tuple row, RowHandler.RowFactory<RowT> factory,
- @Nullable ImmutableBitSet requiredColumns) {
- throw new AssertionError();
+ public ColumnDescriptor columnDescriptor(String fieldName) {
+ RelDataTypeField field = rowType.getField(fieldName, false, false);
+ return new TestColumnDescriptor(field.getIndex(), fieldName);
}
/** {@inheritDoc} */
@Override
- public <RowT> Tuple toTuple(ExecutionContext<RowT> ectx, RowT row, TableModify.Operation op,
- @Nullable Object arg) {
- throw new AssertionError();
- }
+ public ColumnDescriptor columnDescriptor(int idx) {
+ RelDataTypeField field = rowType.getFieldList().get(idx);
- /** {@inheritDoc} */
- @Override
- public ColumnDescriptor columnDescriptor(String fieldName) {
- RelDataTypeField field = rowType.getField(fieldName, false, false);
- return new TestColumnDescriptor(field.getIndex(), fieldName);
+ return new TestColumnDescriptor(field.getIndex(), field.getName());
}
/** {@inheritDoc} */
@Override
- public boolean isGeneratedAlways(RelOptTable table, int idxColumn) {
- throw new AssertionError();
+ public int columnsCount() {
+ return rowType.getFieldCount();
}
/** {@inheritDoc} */
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public boolean isGeneratedAlways(RelOptTable table, int idxColumn) {
throw new AssertionError();
}