You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2021/11/16 14:33:32 UTC

[ignite-3] branch main updated: IGNITE-15847 Refactoring of TableDescriptor for SQL Extension API (#448)

This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3336da1  IGNITE-15847 Refactoring of TableDescriptor for SQL Extension API (#448)
3336da1 is described below

commit 3336da109f036627955450b04eada71ac01039b7
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Tue Nov 16 17:33:28 2021 +0300

    IGNITE-15847 Refactoring of TableDescriptor for SQL Extension API (#448)
---
 .../query/calcite/exec/LogicalRelImplementor.java  |  11 +-
 .../query/calcite/exec/rel/ModifyNode.java         |  18 +--
 .../query/calcite/exec/rel/TableScanNode.java      |  22 +--
 .../query/calcite/schema/IgniteTable.java          |  20 +--
 .../query/calcite/schema/IgniteTableImpl.java      | 142 +++++++++++++++++++-
 .../query/calcite/schema/InternalIgniteTable.java  |  41 ++++++
 .../query/calcite/schema/SchemaHolderImpl.java     |   4 +-
 .../query/calcite/schema/TableDescriptor.java      |  54 ++------
 .../query/calcite/schema/TableDescriptorImpl.java  | 147 +--------------------
 .../query/calcite/planner/AbstractPlannerTest.java |  53 ++++----
 10 files changed, 264 insertions(+), 248 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 3c0ed77..3293cb3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -94,8 +94,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceS
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -285,7 +284,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
         //        RexNode condition = rel.condition();
         //        List<RexNode> projects = rel.projects();
 
-        IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
+        InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
         IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
         ImmutableBitSet requiredColumns = rel.requiredColumns();
@@ -316,7 +315,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
         List<RexNode> projects = rel.projects();
         ImmutableBitSet requiredColumns = rel.requiredColumns();
 
-        IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
+        InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
         IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
         RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
@@ -329,7 +328,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
         return new TableScanNode<>(
                 ctx,
                 rowType,
-                tbl.descriptor(),
+                tbl,
                 group.partitions(ctx.planningContext().localNodeId()),
                 filters,
                 prj,
@@ -493,7 +492,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
             case INSERT:
             case UPDATE:
             case DELETE:
-                ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), rel.getTable().unwrap(TableDescriptor.class),
+                ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), rel.getTable().unwrap(InternalIgniteTable.class),
                         rel.getOperation(), rel.getUpdateColumnList());
 
                 Node<RowT> input = visit(rel.getInput());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 851b0da..93443dc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.table.RecordView;
@@ -38,7 +38,7 @@ import org.apache.ignite.table.Tuple;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<RowT>, Downstream<RowT> {
-    protected final TableDescriptor desc;
+    private final InternalIgniteTable table;
 
     private final TableModify.Operation op;
 
@@ -64,24 +64,24 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
      *
      * @param ctx  Execution context.
      * @param rowType Rel data type.
-     * @param desc Table descriptor.
+     * @param table Table object.
      * @param op Operation/
      * @param cols Update column list.
      */
     public ModifyNode(
             ExecutionContext<RowT> ctx,
             RelDataType rowType,
-            TableDescriptor desc,
+            InternalIgniteTable table,
             TableModify.Operation op,
             List<String> cols
     ) {
         super(ctx, rowType);
 
-        this.desc = desc;
+        this.table = table;
         this.op = op;
         this.cols = cols;
 
-        tableView = desc.table().recordView();
+        tableView = table.table().recordView();
     }
 
     /** {@inheritDoc} */
@@ -114,7 +114,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
             case DELETE:
             case UPDATE:
             case INSERT:
-                tuples.add(desc.toTuple(context(), row, op, cols));
+                tuples.add(table.toTuple(context(), row, op, cols));
 
                 flushTuples(false);
 
@@ -201,13 +201,13 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
                     IgniteTypeFactory typeFactory = context().getTypeFactory();
                     RowHandler.RowFactory<RowT> rowFactory = context().rowHandler().factory(
                             context().getTypeFactory(),
-                            desc.insertRowType(typeFactory)
+                            table.descriptor().insertRowType(typeFactory)
                     );
 
                     throw new IgniteInternalException(
                             "Failed to INSERT some keys because they are already in cache. "
                                     + "[tuples=" + duplicates.stream()
-                                    .map(tup -> desc.toRow(context(), tup, rowFactory, null))
+                                    .map(tup -> table.toRow(context(), tup, rowFactory, null))
                                     .map(context().rowHandler()::toString)
                                     .collect(Collectors.toList()) + ']'
                     );
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java
index a6b7a85..ed105e1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableScanNode.java
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgniteTable;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableRow;
@@ -43,9 +43,11 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
     /** Special value to highlights that all row were received and we are not waiting any more. */
     private static final int NOT_WAITING = -1;
 
-    private final TableImpl table;
+    /** Table that provides access to underlying data. */
+    private final TableImpl physTable;
 
-    private final TableDescriptor desc;
+    /** Table that is an object in SQL schema. */
+    private final InternalIgniteTable schemaTable;
 
     private final RowHandler.RowFactory<RowT> factory;
 
@@ -75,7 +77,7 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
      *
      * @param ctx             Execution context.
      * @param rowType         Output type of the current node.
-     * @param desc            Table descriptor this node should scan.
+     * @param schemaTable     The table this node should scan.
      * @param parts           Partition numbers to scan.
      * @param filters         Optional filter to filter out rows.
      * @param rowTransformer  Optional projection function.
@@ -84,7 +86,7 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
     public TableScanNode(
             ExecutionContext<RowT> ctx,
             RelDataType rowType,
-            TableDescriptor desc,
+            InternalIgniteTable schemaTable,
             int[] parts,
             @Nullable Predicate<RowT> filters,
             @Nullable Function<RowT, RowT> rowTransformer,
@@ -94,8 +96,8 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
 
         assert !nullOrEmpty(parts);
 
-        table = desc.table();
-        this.desc = desc;
+        this.physTable = schemaTable.table();
+        this.schemaTable = schemaTable;
         this.parts = parts;
         this.filters = filters;
         this.rowTransformer = rowTransformer;
@@ -210,7 +212,7 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
         if (subscription != null) {
             subscription.request(waiting);
         } else if (curPartIdx < parts.length) {
-            table.internalTable().scan(parts[curPartIdx++], null).subscribe(new SubscriberImpl());
+            physTable.internalTable().scan(parts[curPartIdx++], null).subscribe(new SubscriberImpl());
         } else {
             waiting = NOT_WAITING;
         }
@@ -268,8 +270,8 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
     }
 
     private RowT convert(BinaryRow binRow) {
-        final org.apache.ignite.internal.schema.row.Row wrapped = table.schemaView().resolve(binRow);
+        final org.apache.ignite.internal.schema.row.Row wrapped = physTable.schemaView().resolve(binRow);
 
-        return desc.toRow(context(), TableRow.tuple(wrapped), factory, requiredColumns);
+        return schemaTable.toRow(context(), TableRow.tuple(wrapped), factory, requiredColumns);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index f5daf02..1924709 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -45,13 +45,13 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
      * @return Table descriptor.
      */
     TableDescriptor descriptor();
-    
+
     /** {@inheritDoc} */
     @Override
     default RelDataType getRowType(RelDataTypeFactory typeFactory) {
         return getRowType(typeFactory, null);
     }
-    
+
     /**
      * Returns new type according {@code requiredColumns} param.
      *
@@ -59,13 +59,13 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
      * @param requiredColumns Used columns enumeration.
      */
     RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet requiredColumns);
-    
+
     /** {@inheritDoc} */
     @Override
     default TableScan toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         return toRel(context.getCluster(), relOptTable);
     }
-    
+
     /**
      * Converts table into relational expression.
      *
@@ -74,31 +74,31 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
      * @return Table relational expression.
      */
     TableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl);
-    
+
     /**
      * Returns table distribution.
      *
      * @return Table distribution.
      */
     IgniteDistribution distribution();
-    
+
     /** {@inheritDoc} */
     default Schema.TableType getJdbcTableType() {
         return Schema.TableType.TABLE;
     }
-    
+
     /** {@inheritDoc} */
     default boolean isRolledUp(String column) {
         return false;
     }
-    
+
     /** {@inheritDoc} */
     default boolean rolledUpColumnValidInsideAgg(String column, SqlCall call,
             @Nullable SqlNode parent,
             @Nullable CalciteConnectionConfig config) {
         return false;
     }
-    
+
     /** {@inheritDoc} */
     default Statistic getStatistic() {
         return new Statistic() {
@@ -108,7 +108,7 @@ public interface IgniteTable extends TranslatableTable, Wrapper {
             }
         };
     }
-    
+
     /** {@inheritDoc} */
     @Override
     default <C> @Nullable C unwrap(Class<C> cls) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
index be66f59..321ff9e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
@@ -18,20 +18,27 @@
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.Statistic;
 import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
@@ -39,6 +46,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite table implementation.
@@ -46,6 +56,8 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
 public class IgniteTableImpl extends AbstractTable implements InternalIgniteTable {
     private final TableDescriptor desc;
 
+    private final TableImpl table;
+
     private final Statistic statistic;
 
     private final Map<String, IgniteIndex> indexes = new ConcurrentHashMap<>();
@@ -55,8 +67,10 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
      *
      * @param desc Table descriptor.
      */
-    public IgniteTableImpl(TableDescriptor desc) {
+    public IgniteTableImpl(TableDescriptor desc, TableImpl table) {
         this.desc = desc;
+        this.table = table;
+
         statistic = new StatisticsImpl();
     }
 
@@ -81,6 +95,12 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
 
     /** {@inheritDoc} */
     @Override
+    public TableImpl table() {
+        return table;
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public IgniteLogicalTableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
         RelTraitSet traitSet = cluster.traitSetOf(distribution())
                 .replace(RewindabilityTrait.REWINDABLE);
@@ -108,7 +128,7 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
     /** {@inheritDoc} */
     @Override
     public ColocationGroup colocationGroup(PlanningContext ctx) {
-        return desc.colocationGroup(ctx);
+        return partitionedGroup();
     }
 
     /** {@inheritDoc} */
@@ -145,6 +165,124 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
         return super.unwrap(cls);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> RowT toRow(
+            ExecutionContext<RowT> ectx,
+            Tuple row,
+            RowHandler.RowFactory<RowT> factory,
+            @Nullable ImmutableBitSet requiredColumns
+    ) {
+        RowHandler<RowT> handler = factory.handler();
+
+        assert handler == ectx.rowHandler();
+
+        RowT res = factory.create();
+
+        assert handler.columnCount(res) == (requiredColumns == null ? desc.columnsCount() : requiredColumns.cardinality());
+
+        if (requiredColumns == null) {
+            for (int i = 0; i < desc.columnsCount(); i++) {
+                ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+                handler.set(i, res, row.value(colDesc.fieldIndex()));
+            }
+        } else {
+            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
+                ColumnDescriptor colDesc = desc.columnDescriptor(j);
+
+                handler.set(i, res, row.value(colDesc.fieldIndex()));
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> Tuple toTuple(
+            ExecutionContext<RowT> ectx,
+            RowT row,
+            TableModify.Operation op,
+            Object arg
+    ) {
+        switch (op) {
+            case INSERT:
+                return insertTuple(row, ectx);
+            case DELETE:
+                return deleteTuple(row, ectx);
+            case UPDATE:
+                return updateTuple(row, (List<String>) arg, ectx);
+            case MERGE:
+                throw new UnsupportedOperationException();
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    private <RowT> Tuple insertTuple(RowT row, ExecutionContext<RowT> ectx) {
+        Tuple tuple = Tuple.create(desc.columnsCount());
+
+        RowHandler<RowT> hnd = ectx.rowHandler();
+
+        for (int i = 0; i < desc.columnsCount(); i++) {
+            tuple.set(desc.columnDescriptor(i).name(), hnd.get(i, row));
+        }
+
+        return tuple;
+    }
+
+    private <RowT> Tuple updateTuple(RowT row, List<String> updateColList, ExecutionContext<RowT> ectx) {
+        RowHandler<RowT> hnd = ectx.rowHandler();
+        int offset = desc.columnsCount();
+        Tuple tuple = Tuple.create(desc.columnsCount());
+        Set<String> colsToSkip = new HashSet<>(updateColList);
+
+        for (int i = 0; i < desc.columnsCount(); i++) {
+            String colName = desc.columnDescriptor(i).name();
+
+            if (!colsToSkip.contains(colName)) {
+                tuple.set(colName, hnd.get(i, row));
+            }
+        }
+
+        for (int i = 0; i < updateColList.size(); i++) {
+            final ColumnDescriptor colDesc = Objects.requireNonNull(desc.columnDescriptor(updateColList.get(i)));
+
+            assert !colDesc.key();
+
+            Object fieldVal = hnd.get(i + offset, row);
+
+            tuple.set(colDesc.name(), fieldVal);
+        }
+
+        return tuple;
+    }
+
+    private <RowT> Tuple deleteTuple(RowT row, ExecutionContext<RowT> ectx) {
+        RowHandler<RowT> hnd = ectx.rowHandler();
+        Tuple tuple = Tuple.create();
+
+        int idx = 0;
+        for (int i = 0; i < desc.columnsCount(); i++) {
+            ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+            if (colDesc.key()) {
+                tuple.set(colDesc.name(), hnd.get(idx++, row));
+            }
+        }
+
+        return tuple;
+    }
+
+    private ColocationGroup partitionedGroup() {
+        List<List<String>> assignments = table.internalTable().assignments().stream()
+                .map(Collections::singletonList)
+                .collect(Collectors.toList());
+
+        return ColocationGroup.forAssignments(assignments);
+    }
+
     private class StatisticsImpl implements Statistic {
         /** {@inheritDoc} */
         @Override
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
index 3145285..71c7cb1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
@@ -20,9 +20,16 @@ package org.apache.ignite.internal.processors.query.calcite.schema;
 import java.util.Map;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite internal table.
@@ -38,6 +45,40 @@ public interface InternalIgniteTable extends IgniteTable {
      */
     IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName);
 
+    /** Returns the internal table. */
+    TableImpl table();
+
+    /**
+     * Converts a tuple to relational node row.
+     *
+     * @param ectx            Execution context.
+     * @param row             Tuple to convert.
+     * @param requiredColumns Participating columns.
+     * @return Relational node row.
+     */
+    <RowT> RowT toRow(
+            ExecutionContext<RowT> ectx,
+            Tuple row,
+            RowHandler.RowFactory<RowT> factory,
+            @Nullable ImmutableBitSet requiredColumns
+    );
+
+    /**
+     * Converts a relational node row to internal tuple.
+     *
+     * @param ectx Execution context.
+     * @param row  Relational node row.
+     * @param op   Operation.
+     * @param arg  Operation specific argument.
+     * @return Cache key-value tuple;
+     */
+    <RowT> Tuple toTuple(
+            ExecutionContext<RowT> ectx,
+            RowT row,
+            TableModify.Operation op,
+            @Nullable Object arg
+    );
+
     /**
      * Returns nodes mapping.
      *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 4887e7a..f65c628 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -90,9 +90,9 @@ public class SchemaHolderImpl implements SchemaHolder {
                 ))
                 .collect(Collectors.toList());
 
-        TableDescriptorImpl desc = new TableDescriptorImpl(table, colDescriptors);
+        TableDescriptorImpl desc = new TableDescriptorImpl(colDescriptors);
 
-        schema.addTable(removeSchema(schemaName, table.tableName()), new IgniteTableImpl(desc));
+        schema.addTable(removeSchema(schemaName, table.tableName()), new IgniteTableImpl(desc, table));
 
         rebuild();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
index 1c2889e..a7dde3a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
@@ -18,21 +18,13 @@
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
 import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql2rel.InitializerExpressionFactory;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.table.Tuple;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * TableDescriptor interface.
@@ -42,17 +34,6 @@ public interface TableDescriptor extends RelProtoDataType, InitializerExpression
     /** Returns distribution of the table. */
     IgniteDistribution distribution();
 
-    /** Returns the table this description describes. */
-    TableImpl table();
-
-    /**
-     * Returns nodes mapping.
-     *
-     * @param ctx Planning context.
-     * @return Nodes mapping.
-     */
-    ColocationGroup colocationGroup(PlanningContext ctx);
-
     /** {@inheritDoc} */
     @Override
     default RelDataType apply(RelDataTypeFactory factory) {
@@ -108,40 +89,23 @@ public interface TableDescriptor extends RelProtoDataType, InitializerExpression
     boolean isUpdateAllowed(RelOptTable tbl, int colIdx);
 
     /**
-     * Converts a tuple to relational node row.
+     * Returns column descriptor for given field name.
      *
-     * @param ectx            Execution context.
-     * @param row             Tuple to convert.
-     * @param requiredColumns Participating columns.
-     * @return Relational node row.
+     * @return Column descriptor
      */
-    <RowT> RowT toRow(
-            ExecutionContext<RowT> ectx,
-            Tuple row,
-            RowHandler.RowFactory<RowT> factory,
-            @Nullable ImmutableBitSet requiredColumns
-    );
+    ColumnDescriptor columnDescriptor(String fieldName);
 
     /**
-     * Converts a relational node row to internal tuple.
+     * Returns column descriptor for column of given index.
      *
-     * @param ectx Execution context.
-     * @param row  Relational node row.
-     * @param op   Operation.
-     * @param arg  Operation specific argument.
-     * @return Cache key-value tuple;
+     * @return Column descriptor or null if there is no column with given index.
      */
-    <RowT> Tuple toTuple(
-            ExecutionContext<RowT> ectx,
-            RowT row,
-            TableModify.Operation op,
-            @Nullable Object arg
-    );
+    ColumnDescriptor columnDescriptor(int idx);
 
     /**
-     * Returns column descriptor for given field name.
+     * Returns count of columns in the table.
      *
-     * @return Column descriptor
+     * @return Actual count of columns.
      */
-    ColumnDescriptor columnDescriptor(String fieldName);
+    int columnsCount();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 2952ef0..d48d54e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -17,16 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
@@ -35,17 +29,10 @@ import org.apache.calcite.schema.ColumnStrategy;
 import org.apache.calcite.sql2rel.InitializerContext;
 import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
-import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.table.Tuple;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * TableDescriptorImpl.
@@ -54,8 +41,6 @@ import org.jetbrains.annotations.Nullable;
 public class TableDescriptorImpl extends NullInitializerExpressionFactory implements TableDescriptor {
     private static final ColumnDescriptor[] DUMMY = new ColumnDescriptor[0];
 
-    private final TableImpl table;
-
     private final ColumnDescriptor[] descriptors;
 
     private final Map<String, ColumnDescriptor> descriptorsMap;
@@ -69,7 +54,6 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public TableDescriptorImpl(
-            TableImpl table,
             List<ColumnDescriptor> columnDescriptors
     ) {
         ImmutableBitSet.Builder keyFieldsBuilder = ImmutableBitSet.builder();
@@ -85,7 +69,6 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
 
         this.descriptors = columnDescriptors.toArray(DUMMY);
         this.descriptorsMap = descriptorsMap;
-        this.table = table;
 
         insertFields = ImmutableBitSet.range(columnDescriptors.size());
         keyFields = keyFieldsBuilder.build();
@@ -111,67 +94,6 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
 
     /** {@inheritDoc} */
     @Override
-    public TableImpl table() {
-        return table;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <RowT> RowT toRow(
-            ExecutionContext<RowT> ectx,
-            Tuple row,
-            RowHandler.RowFactory<RowT> factory,
-            @Nullable ImmutableBitSet requiredColumns
-    ) {
-        RowHandler<RowT> handler = factory.handler();
-
-        assert handler == ectx.rowHandler();
-
-        RowT res = factory.create();
-
-        assert handler.columnCount(res) == (requiredColumns == null ? descriptors.length : requiredColumns.cardinality());
-
-        if (requiredColumns == null) {
-            for (int i = 0; i < descriptors.length; i++) {
-                ColumnDescriptor desc = descriptors[i];
-
-                handler.set(i, res, row.value(desc.fieldIndex()));
-            }
-        } else {
-            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
-                ColumnDescriptor desc = descriptors[j];
-
-                handler.set(i, res, row.value(desc.fieldIndex()));
-            }
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <RowT> Tuple toTuple(
-            ExecutionContext<RowT> ectx,
-            RowT row,
-            TableModify.Operation op,
-            Object arg
-    ) {
-        switch (op) {
-            case INSERT:
-                return insertTuple(row, ectx);
-            case DELETE:
-                return deleteTuple(row, ectx);
-            case UPDATE:
-                return updateTuple(row, (List<String>) arg, ectx);
-            case MERGE:
-                throw new UnsupportedOperationException();
-            default:
-                throw new AssertionError();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
     public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
         return !descriptors[colIdx].key();
     }
@@ -227,70 +149,13 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory implem
 
     /** {@inheritDoc} */
     @Override
-    public ColocationGroup colocationGroup(PlanningContext ctx) {
-        return partitionedGroup();
+    public ColumnDescriptor columnDescriptor(int idx) {
+        return idx < 0 || idx >= descriptors.length ? null : descriptors[idx];
     }
 
-    private ColocationGroup partitionedGroup() {
-        List<List<String>> assignments = table.internalTable().assignments().stream()
-                .map(Collections::singletonList)
-                .collect(Collectors.toList());
-
-        return ColocationGroup.forAssignments(assignments);
-    }
-
-    private <RowT> Tuple insertTuple(RowT row, ExecutionContext<RowT> ectx) {
-        Tuple tuple = Tuple.create(descriptors.length);
-
-        RowHandler<RowT> hnd = ectx.rowHandler();
-
-        for (int i = 0; i < descriptors.length; i++) {
-            tuple.set(descriptors[i].name(), hnd.get(i, row));
-        }
-
-        return tuple;
-    }
-
-    private <RowT> Tuple updateTuple(RowT row, List<String> updateColList, ExecutionContext<RowT> ectx) {
-        RowHandler<RowT> hnd = ectx.rowHandler();
-        int offset = descriptorsMap.size();
-        Tuple tuple = Tuple.create(descriptors.length);
-        Set<String> colsToSkip = new HashSet<>(updateColList);
-
-        for (int i = 0; i < descriptors.length; i++) {
-            String colName = descriptors[i].name();
-
-            if (!colsToSkip.contains(colName)) {
-                tuple.set(colName, hnd.get(i, row));
-            }
-        }
-
-        for (int i = 0; i < updateColList.size(); i++) {
-            final ColumnDescriptor desc = Objects.requireNonNull(descriptorsMap.get(updateColList.get(i)));
-
-            assert !desc.key();
-
-            Object fieldVal = hnd.get(i + offset, row);
-
-            tuple.set(desc.name(), fieldVal);
-        }
-
-        return tuple;
-    }
-
-    private <RowT> Tuple deleteTuple(RowT row, ExecutionContext<RowT> ectx) {
-        RowHandler<RowT> hnd = ectx.rowHandler();
-        Tuple tuple = Tuple.create(keyFields.cardinality());
-
-        int idx = 0;
-        for (int i = 0; i < descriptors.length; i++) {
-            ColumnDescriptor desc = descriptors[i];
-
-            if (desc.key()) {
-                tuple.set(desc.name(), hnd.get(idx++, row));
-            }
-        }
-
-        return tuple;
+    /** {@inheritDoc} */
+    @Override
+    public int columnsCount() {
+        return descriptors.length;
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index e49b61c..52752cb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -56,7 +56,7 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelReferentialConstraint;
 import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableModify.Operation;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -74,7 +74,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql2rel.InitializerContext;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
@@ -725,6 +725,25 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
         public String name() {
             return name;
         }
+
+        /** {@inheritDoc} */
+        @Override
+        public TableImpl table() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public <RowT> RowT toRow(ExecutionContext<RowT> ectx, Tuple row, RowFactory<RowT> factory,
+                @Nullable ImmutableBitSet requiredColumns) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public <RowT> Tuple toTuple(ExecutionContext<RowT> ectx, RowT row, Operation op, @Nullable Object arg) {
+            throw new AssertionError();
+        }
     }
 
     /**
@@ -753,12 +772,6 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
 
         /** {@inheritDoc} */
         @Override
-        public TableImpl table() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override
         public RelDataType rowType(IgniteTypeFactory factory, ImmutableBitSet usedColumns) {
             return rowType;
         }
@@ -771,34 +784,28 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
 
         /** {@inheritDoc} */
         @Override
-        public <RowT> RowT toRow(ExecutionContext<RowT> ectx, Tuple row, RowHandler.RowFactory<RowT> factory,
-                @Nullable ImmutableBitSet requiredColumns) {
-            throw new AssertionError();
+        public ColumnDescriptor columnDescriptor(String fieldName) {
+            RelDataTypeField field = rowType.getField(fieldName, false, false);
+            return new TestColumnDescriptor(field.getIndex(), fieldName);
         }
 
         /** {@inheritDoc} */
         @Override
-        public <RowT> Tuple toTuple(ExecutionContext<RowT> ectx, RowT row, TableModify.Operation op,
-                @Nullable Object arg) {
-            throw new AssertionError();
-        }
+        public ColumnDescriptor columnDescriptor(int idx) {
+            RelDataTypeField field = rowType.getFieldList().get(idx);
 
-        /** {@inheritDoc} */
-        @Override
-        public ColumnDescriptor columnDescriptor(String fieldName) {
-            RelDataTypeField field = rowType.getField(fieldName, false, false);
-            return new TestColumnDescriptor(field.getIndex(), fieldName);
+            return new TestColumnDescriptor(field.getIndex(), field.getName());
         }
 
         /** {@inheritDoc} */
         @Override
-        public boolean isGeneratedAlways(RelOptTable table, int idxColumn) {
-            throw new AssertionError();
+        public int columnsCount() {
+            return rowType.getFieldCount();
         }
 
         /** {@inheritDoc} */
         @Override
-        public ColocationGroup colocationGroup(PlanningContext ctx) {
+        public boolean isGeneratedAlways(RelOptTable table, int idxColumn) {
             throw new AssertionError();
         }