You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2020/12/25 13:03:03 UTC

[ignite] branch sql-calcite updated: IGNITE-12819 Introduce query planner cost system (closes #8590)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 1bdccb8  IGNITE-12819 Introduce query planner cost system (closes #8590)
1bdccb8 is described below

commit 1bdccb85767c5148653d0c60ec0f5332a89b907f
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Fri Dec 25 16:02:46 2020 +0300

    IGNITE-12819 Introduce query planner cost system (closes #8590)
---
 .../query/calcite/CalciteQueryProcessor.java       |   5 +-
 .../query/calcite/metadata/IgniteMdRowCount.java   |  60 +---
 .../query/calcite/metadata/cost/IgniteCost.java    | 228 +++++++++++++++
 .../calcite/metadata/cost/IgniteCostFactory.java   | 101 +++++++
 .../processors/query/calcite/prepare/Fragment.java |   9 +
 .../query/calcite/prepare/QueryPlanCacheImpl.java  |   6 +-
 ...NestedLoopJoin.java => AbstractIgniteJoin.java} |  65 ++---
 .../query/calcite/rel/AbstractIndexScan.java       |  39 ++-
 .../query/calcite/rel/IgniteAggregate.java         |  13 +
 .../rel/IgniteCorrelatedNestedLoopJoin.java        |  28 +-
 .../query/calcite/rel/IgniteExchange.java          |  19 +-
 .../processors/query/calcite/rel/IgniteFilter.java |   8 +-
 .../query/calcite/rel/IgniteIndexScan.java         |   1 +
 .../processors/query/calcite/rel/IgniteLimit.java  |  44 +--
 .../query/calcite/rel/IgniteMapAggregate.java      |  13 +
 .../query/calcite/rel/IgniteMergeJoin.java         | 317 +--------------------
 .../query/calcite/rel/IgniteNestedLoopJoin.java    |  40 +--
 .../query/calcite/rel/IgniteProject.java           |   8 +-
 .../query/calcite/rel/IgniteReduceAggregate.java   |  16 ++
 .../processors/query/calcite/rel/IgniteSort.java   |  18 ++
 .../query/calcite/rel/IgniteTableSpool.java        |  16 +-
 .../query/calcite/rel/IgniteTrimExchange.java      |  15 +-
 .../query/calcite/rel/IgniteUnionAll.java          |  11 +
 .../rel/ProjectableFilterableTableScan.java        |  10 +-
 .../CalciteBasicSecondaryIndexIntegrationTest.java |  11 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  25 +-
 .../processors/query/calcite/PlannerTest.java      |  22 +-
 .../processors/query/calcite/QueryChecker.java     |  29 +-
 .../calcite/rules/ProjectScanMergeRuleTest.java    |   9 +-
 29 files changed, 664 insertions(+), 522 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 2af998c..1f09f08 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.List;
+
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptCostImpl;
 import org.apache.calcite.sql.fun.SqlLibrary;
 import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityServ
 import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingServiceImpl;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
 import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
@@ -88,7 +89,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
         // Context provides a way to store data within the planner session that can be accessed in planner rules.
         .context(Contexts.empty())
         // Custom cost factory to use during optimization
-        .costFactory(RelOptCostImpl.FACTORY)
+        .costFactory(new IgniteCostFactory())
         .typeSystem(IgniteTypeSystem.INSTANCE)
         .build();
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 88b9025..5192d92 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -30,7 +30,6 @@ import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
@@ -45,64 +44,7 @@ public class IgniteMdRowCount extends RelMdRowCount {
 
     /** {@inheritDoc} */
     @Override public Double getRowCount(Join rel, RelMetadataQuery mq) {
-        return joinRowCount(mq, rel);
-    }
-
-    /** */
-    public Double getRowCount(IgniteCorrelatedNestedLoopJoin rel, RelMetadataQuery mq) {
-        if (!rel.getJoinType().projectsRight()) {
-            // Create a RexNode representing the selectivity of the
-            // semijoin filter and pass it to getSelectivity
-            RexNode semiJoinSelectivity =
-                RelMdUtil.makeSemiJoinSelectivityRexNode(mq, rel);
-
-            return multiply(mq.getSelectivity(rel.getLeft(), semiJoinSelectivity),
-                mq.getRowCount(rel.getLeft()));
-        }
-
-        // Row count estimates of 0 will be rounded up to 1.
-        // So, use maxRowCount where the product is very small.
-        final Double left = mq.getRowCount(rel.getLeft());
-        final Double right = mq.getRowCount(rel.getRight());
-
-        if (left == null || right == null)
-            return null;
-
-        if (left <= 1D || right <= 1D) {
-            Double max = mq.getMaxRowCount(rel);
-            if (max != null && max <= 1D)
-                return max;
-        }
-
-        JoinInfo joinInfo = rel.analyzeCondition();
-
-        ImmutableIntList leftKeys = joinInfo.leftKeys;
-        ImmutableIntList rightKeys = joinInfo.rightKeys;
-
-        if (F.isEmpty(leftKeys) || F.isEmpty(rightKeys)) {
-            if (F.isEmpty(leftKeys))
-                return left * right;
-            else
-                return left * right * mq.getSelectivity(rel, rel.getCondition());
-        }
-
-        double leftDistinct = Util.first(
-            mq.getDistinctRowCount(rel.getLeft(), ImmutableBitSet.of(leftKeys), null), left);
-
-        double leftCardinality = leftDistinct / left;
-
-        double rowsCount = left * right / leftCardinality;
-
-        JoinRelType type = rel.getJoinType();
-
-        if (type == JoinRelType.LEFT)
-            rowsCount += left;
-        else if (type == JoinRelType.RIGHT)
-            rowsCount += right;
-        else if (type == JoinRelType.FULL)
-            rowsCount += left + right;
-
-        return rowsCount;
+        return rel.estimateRowCount(mq);
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
new file mode 100644
index 0000000..a78ff09
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata.cost;
+
+import java.util.Objects;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Estimated execution cost of a {@link RelNode}. Measured in abstract points.
+ */
+public class IgniteCost implements RelOptCost {
+    /** Cost of a passing a single row through an execution node. */
+    public static final double ROW_PASS_THROUGH_COST = 1;
+
+    /** Size of a particular field. */
+    public static final double AVERAGE_FIELD_SIZE = 4; // such accuracy should be enough for an estimate
+
+    /** Cost of a comparison of one row. */
+    public static final double ROW_COMPARISON_COST = 3;
+
+    /**
+     * With broadcast distribution each row will be sent to the each distination node,
+     * thus the total bytes amount will be multiplies of the destination nodes count.
+     * Right now it's just a const.
+     */
+    public static final double BROADCAST_DISTRIBUTION_PENALTY = 5;
+
+    /** */
+    static final IgniteCost ZERO = new IgniteCost(0, 0, 0, 0, 0);
+
+    /** */
+    static final IgniteCost TINY = new IgniteCost(1, 1, 1, 1, 1);
+
+    /** */
+    static final IgniteCost HUGE = new IgniteCost(
+        Double.MAX_VALUE,
+        Double.MAX_VALUE,
+        Double.MAX_VALUE,
+        Double.MAX_VALUE,
+        Double.MAX_VALUE
+    );
+
+    /** */
+    static final IgniteCost INFINITY = new IgniteCost(
+        Double.POSITIVE_INFINITY,
+        Double.POSITIVE_INFINITY,
+        Double.POSITIVE_INFINITY,
+        Double.POSITIVE_INFINITY,
+        Double.POSITIVE_INFINITY
+    );
+
+    /** Count of the processed rows. */
+    private final double rowCount;
+
+    /** Amount of CPU points. */
+    private final double cpu;
+
+    /** Amount of Memory points. */
+    private final double memory;
+
+    /** Amount of IO points. */
+    private final double io;
+
+    /** Amount of Network points. */
+    private final double network;
+
+    /**
+     * @param rowCount Row count.
+     * @param cpu Cpu.
+     * @param memory Memory.
+     * @param io Io.
+     * @param network Network.
+     */
+    IgniteCost(double rowCount, double cpu, double memory, double io, double network) {
+        this.rowCount = rowCount;
+        this.cpu = cpu;
+        this.memory = memory;
+        this.io = io;
+        this.network = network;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getRows() {
+        return rowCount;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getCpu() {
+        return cpu;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getIo() {
+        return io;
+    }
+
+    /**
+     * @return Usage of Memory resources.
+     */
+    public double getMemory() {
+        return memory;
+    }
+
+    /**
+     * @return Usage of Network resources.
+     */
+    public double getNetwork() {
+        return network;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isInfinite() {
+        return this == INFINITY
+            || rowCount == Double.POSITIVE_INFINITY
+            || cpu == Double.POSITIVE_INFINITY
+            || memory == Double.POSITIVE_INFINITY
+            || io == Double.POSITIVE_INFINITY
+            || network == Double.POSITIVE_INFINITY;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(rowCount, cpu, io, memory, network);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("FloatingPointEquality")
+    @Override public boolean equals(RelOptCost cost) {
+        return this == cost || (cost instanceof IgniteCost
+            && rowCount == ((IgniteCost)cost).rowCount
+            && cpu == ((IgniteCost)cost).cpu
+            && memory == ((IgniteCost)cost).memory
+            && io == ((IgniteCost)cost).io
+            && network == ((IgniteCost)cost).network
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isEqWithEpsilon(RelOptCost cost) {
+        return this == cost || (cost instanceof IgniteCost
+            && Math.abs(rowCount - ((IgniteCost)cost).rowCount) < RelOptUtil.EPSILON
+            && Math.abs(cpu - ((IgniteCost)cost).cpu) < RelOptUtil.EPSILON
+            && Math.abs(memory - ((IgniteCost)cost).memory) < RelOptUtil.EPSILON
+            && Math.abs(io - ((IgniteCost)cost).io) < RelOptUtil.EPSILON
+            && Math.abs(network - ((IgniteCost)cost).network) < RelOptUtil.EPSILON
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLe(RelOptCost cost) {
+        IgniteCost other = (IgniteCost)cost;
+
+        return this == cost || (cpu + memory + io + network) <= (other.cpu + other.memory + other.io + other.network);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLt(RelOptCost cost) {
+        IgniteCost other = (IgniteCost)cost;
+
+        return this != cost && (cpu + memory + io + network) < (other.cpu + other.memory + other.io + other.network);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost plus(RelOptCost cost) {
+        IgniteCost other = (IgniteCost)cost;
+
+        return new IgniteCost(
+            rowCount + other.rowCount,
+            cpu + other.cpu,
+            memory + other.memory,
+            io + other.io,
+            network + other.network
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost minus(RelOptCost cost) {
+        IgniteCost other = (IgniteCost)cost;
+
+        return new IgniteCost(
+            rowCount - other.rowCount,
+            cpu - other.cpu,
+            memory - other.memory,
+            io - other.io,
+            network - other.network
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost multiplyBy(double factor) {
+        return new IgniteCost(
+            rowCount * factor,
+            cpu * factor,
+            memory * factor,
+            io * factor,
+            network * factor
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public double divideBy(RelOptCost cost) {
+        throw new UnsupportedOperationException(IgniteCost.class.getSimpleName() + "#divideBy");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteCost.class, this);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCostFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCostFactory.java
new file mode 100644
index 0000000..d1c5643
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCostFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata.cost;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptCostFactory;
+
+/**
+ * Ignite cost factory.
+ * Creates a desired cost with regards to the weight of each particular resource.
+ */
+public class IgniteCostFactory implements RelOptCostFactory {
+    /** Cpu weight. */
+    private final double cpuWeight;
+
+    /** Memory weight. */
+    private final double memoryWeight;
+
+    /** Io weight. */
+    private final double ioWeight;
+
+    /** Network weight. */
+    private final double networkWeight;
+
+    /**
+     * Cerates a factory with default weights equal to 1.
+     */
+    public IgniteCostFactory() {
+        cpuWeight = 1;
+        memoryWeight = 1;
+        ioWeight = 1;
+        networkWeight = 1;
+    }
+
+    /**
+     * Cerates a factory with provided weights. Each weight should be non negative value.
+     */
+    public IgniteCostFactory(double cpuWeight, double memoryWeight, double ioWeight, double networkWeight) {
+        if (cpuWeight < 0 || memoryWeight < 0 || ioWeight < 0 || networkWeight < 0)
+            throw new IllegalArgumentException("Weight should be non negative: cpu=" + cpuWeight +
+                ", memory=" + memoryWeight + ", io=" + ioWeight + ", network=" + networkWeight);
+
+        this.cpuWeight = cpuWeight;
+        this.memoryWeight = memoryWeight;
+        this.ioWeight = ioWeight;
+        this.networkWeight = networkWeight;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost makeCost(double rowCount, double cpu, double io) {
+        return makeCost(rowCount, cpu, io, 0, 0);
+    }
+
+    /**
+     * Creates a cost object with regards to resources' weight.
+     *
+     * @param rowCount Count of processed rows.
+     * @param cpu Amount of consumed CPU.
+     * @param io Amount of consumed Io.
+     * @param memory Amount of consumed Memory.
+     * @param network Amount of consumed Network.
+     */
+    public RelOptCost makeCost(double rowCount, double cpu, double io, double memory, double network) {
+        return new IgniteCost(rowCount, cpu * cpuWeight, memory * memoryWeight, io * ioWeight, network * networkWeight);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost makeHugeCost() {
+        return IgniteCost.HUGE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost makeInfiniteCost() {
+        return IgniteCost.INFINITY;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost makeTinyCost() {
+        return IgniteCost.TINY;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost makeZeroCost() {
+        return IgniteCost.ZERO;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 5285df3..90a5a5f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
@@ -161,6 +162,14 @@ public class Fragment {
             if (rootFragment())
                 mapping = FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
 
+            if (single() && mapping.nodeIds().size() > 1) {
+                // this is possible when the fragment contains scan of a replicated cache, which brings
+                // several nodes (actually all containing nodes) to the collocation group, but this fragment
+                // supposed to be executed on a single node, so let's choose one wisely
+                mapping = FragmentMapping.create(mapping.nodeIds()
+                    .get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
+            }
+
             return mapping.finalize(nodesSource);
         }
         catch (NodeMappingException e) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 65d6f0e..d05c653 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -90,10 +90,8 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
         }
     }
 
-    /**
-     * Clear cached plans.
-     */
-    public void clear() {
+    /** {@inheritDoc} */
+    @Override public void clear() {
         cache = new GridBoundedConcurrentLinkedHashMap<>(CACHE_SIZE);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
similarity index 86%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
index 00041b0..cb54815 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
@@ -25,8 +25,6 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
@@ -37,12 +35,12 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
@@ -51,9 +49,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgni
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 
-import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
 import static org.apache.calcite.rel.core.JoinRelType.INNER;
 import static org.apache.calcite.rel.core.JoinRelType.LEFT;
 import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
@@ -63,21 +59,14 @@ import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDi
 import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 
 /** */
-public abstract class AbstractIgniteNestedLoopJoin extends Join implements TraitsAwareIgniteRel {
+public abstract class AbstractIgniteJoin extends Join implements TraitsAwareIgniteRel {
     /** */
-    protected AbstractIgniteNestedLoopJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right,
+    protected AbstractIgniteJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right,
         RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
         super(cluster, traitSet, left, right, condition, variablesSet, joinType);
     }
 
     /** {@inheritDoc} */
-    @Override public abstract Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right,
-        JoinRelType joinType, boolean semiJoinDone);
-
-    /** {@inheritDoc} */
-    @Override public abstract <T> T accept(IgniteRelVisitor<T> visitor);
-
-    /** {@inheritDoc} */
     @Override public RelWriter explainTerms(RelWriter pw) {
         return super.explainTerms(pw)
             .itemIf("variablesSet", Commons.transform(variablesSet.asList(), CorrelationId::getId), pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES);
@@ -223,6 +212,17 @@ public abstract class AbstractIgniteNestedLoopJoin extends Join implements Trait
     }
 
     /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        // left correlations
+        Set<CorrelationId> corrIds = new HashSet<>(TraitUtils.correlation(inTraits.get(0)).correlationIds());
+        // right correlations
+        corrIds.addAll(TraitUtils.correlation(inTraits.get(1)).correlationIds());
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)), inTraits));
+    }
+
+    /** {@inheritDoc} */
     @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // We preserve left collation since it's translated into a nested loop join with an outer loop
         // over a left edge. The code below checks whether a desired collation is possible and requires
@@ -252,7 +252,10 @@ public abstract class AbstractIgniteNestedLoopJoin extends Join implements Trait
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
         // Tere are several rules:
         // 1) any join is possible on broadcast or single distribution
         // 2) hash distributed join is possible when join keys equal to source distribution keys
@@ -304,38 +307,6 @@ public abstract class AbstractIgniteNestedLoopJoin extends Join implements Trait
         return Util.first(joinRowCount(mq, this), 1D);
     }
 
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rowCount = mq.getRowCount(this);
-
-        // Joins can be flipped, and for many algorithms, both versions are viable
-        // and have the same cost. To make the results stable between versions of
-        // the planner, make one of the versions slightly more expensive.
-        if (joinType == RIGHT)
-            rowCount = RelMdUtil.addEpsilon(rowCount);
-
-        final double rightRowCount = right.estimateRowCount(mq);
-        final double leftRowCount = left.estimateRowCount(mq);
-
-        if (Double.isInfinite(leftRowCount))
-            rowCount = leftRowCount;
-        if (Double.isInfinite(rightRowCount))
-            rowCount = rightRowCount;
-
-        if (!Double.isInfinite(leftRowCount) && !Double.isInfinite(rightRowCount) && leftRowCount > rightRowCount)
-            rowCount = RelMdUtil.addEpsilon(rowCount);
-
-        RelDistribution.Type type = distribution().getType();
-
-        if (type == SINGLETON)
-            rowCount = RelMdUtil.addEpsilon(rowCount);
-
-        if (type == BROADCAST_DISTRIBUTED)
-            rowCount = RelMdUtil.addEpsilon(RelMdUtil.addEpsilon(rowCount));
-
-        return planner.getCostFactory().makeCost(rowCount, 0, 0);
-    }
-
     /** */
     protected boolean projectsLeft(RelCollation collation) {
         int leftFieldCount = getLeft().getRowType().getFieldCount();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
index d08d94d..5281757 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -28,9 +29,12 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -148,6 +152,39 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        return super.computeSelfCost(planner, mq).plus(planner.getCostFactory().makeTinyCost());
+        double rows = table.getRowCount();
+
+        double cost = rows * IgniteCost.ROW_PASS_THROUGH_COST;
+
+        if (condition != null) {
+            RexBuilder builder = getCluster().getRexBuilder();
+
+            double selectivity = 1;
+
+            cost = 0;
+
+            if (lowerCond != null) {
+                double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, lowerCond));
+
+                selectivity -= 1 - selectivity0;
+
+                cost += Math.log(rows);
+            }
+
+            if (upperCond != null) {
+                double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, upperCond));
+
+                selectivity -= 1 - selectivity0;
+            }
+
+            rows *= selectivity;
+
+            if (rows <= 0)
+                rows = 1;
+
+            cost += rows * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST);
+        }
+
+        return planner.getCostFactory().makeCost(rows, cost, 0);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
index 64662fe..f3eef52 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
@@ -22,6 +22,8 @@ import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollations;
@@ -30,10 +32,12 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -258,4 +262,13 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
         return new IgniteAggregate(cluster, getTraitSet(), sole(inputs),
             getGroupSet(), getGroupSets(), getAggCallList());
     }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        double rows = mq.getRowCount(getInput());
+
+        // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
+        // currently it's OK to have such a dummy cost because there is no other options
+        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
index 386e51c..e270f12 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -37,6 +37,8 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -50,7 +52,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
  * The set of output rows is a subset of the cartesian product of the two
  * inputs; precisely which subset depends on the join condition.
  */
-public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteNestedLoopJoin {
+public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
     /**
      * Creates a Join.
      *
@@ -138,8 +140,22 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteNestedLoopJoin
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        // Give it some penalty
-        return super.computeSelfCost(planner, mq).multiplyBy(5);
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double leftCount = mq.getRowCount(getLeft());
+
+        if (Double.isInfinite(leftCount))
+            return costFactory.makeInfiniteCost();
+
+        double rightCount = mq.getRowCount(getRight());
+
+        if (Double.isInfinite(rightCount))
+            return costFactory.makeInfiniteCost();
+
+        double rows = leftCount * rightCount;
+
+        return costFactory.makeCost(rows,
+            rows * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST), 0);
     }
 
     /** {@inheritDoc} */
@@ -185,4 +201,10 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteNestedLoopJoin
     @Override public RelWriter explainTerms(RelWriter pw) {
         return super.explainTerms(pw).item("correlationVariables", getVariablesSet());
     }
+
+    /** {@inheritDoc} */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        // condition selectivity already counted within the external filter
+        return super.estimateRowCount(mq) / mq.getSelectivity(this, getCondition());
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 9035416..7e61a18 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -18,18 +18,20 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 
 import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
@@ -72,9 +74,16 @@ public class IgniteExchange extends Exchange implements IgniteRel {
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rowCount = mq.getRowCount(this);
-        double bytesPerRow = getRowType().getFieldCount() * 4 * (TraitUtils.distribution(this) == IgniteDistributions.broadcast() ? 10 : 1);
-        return planner.getCostFactory().makeCost(rowCount * bytesPerRow, rowCount, 0);
+        double rowCount = mq.getRowCount(getInput());
+        double bytesPerRow = getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+        double totalBytes = rowCount * bytesPerRow;
+
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        if (RelDistributions.BROADCAST_DISTRIBUTED.equals(distribution))
+            totalBytes *= IgniteCost.BROADCAST_DISTRIBUTION_PENALTY;
+
+        return costFactory.makeCost(rowCount, rowCount * IgniteCost.ROW_PASS_THROUGH_COST, 0, 0, totalBytes);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 4dc5e2d..cdf08d5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
 import java.util.Set;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
@@ -28,10 +29,10 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
@@ -126,8 +127,9 @@ public class IgniteFilter extends Filter implements TraitsAwareIgniteRel {
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
         double rowCount = mq.getRowCount(getInput());
-        rowCount = RelMdUtil.addEpsilon(rowCount); // to differ from rel nodes with integrated filter
-        return planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+        return planner.getCostFactory().makeCost(rowCount,
+            rowCount * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST), 0);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
index 42e99fe..d5e24c4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
index 2e25e23..534719d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
@@ -31,9 +32,16 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 
 /** */
 public class IgniteLimit extends SingleRel implements IgniteRel {
+    /** In case the fetch value is a DYNAMIC_PARAM. */
+    private static final double FETCH_IS_PARAM_FACTOR = 0.01;
+
+    /** In case the offset value is a DYNAMIC_PARAM. */
+    private static final double OFFSET_IS_PARAM_FACTOR = 0.5;
+
     /** Offset. */
     private final RexNode offset;
 
@@ -99,44 +107,40 @@ public class IgniteLimit extends SingleRel implements IgniteRel {
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rows = estimateRowCount(mq);
+        double inputRowCount = mq.getRowCount(getInput());
+
+        double lim = fetch != null ? doubleFromRex(fetch, inputRowCount * FETCH_IS_PARAM_FACTOR) : inputRowCount;
+        double off = offset != null ? doubleFromRex(offset, inputRowCount * OFFSET_IS_PARAM_FACTOR) : 0;
+
+        double rows = Math.min(lim + off, inputRowCount);
 
-        return planner.getCostFactory().makeCost(rows, 0, 0);
+        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
     }
 
     /** {@inheritDoc} */
     @Override public double estimateRowCount(RelMetadataQuery mq) {
-        Integer lim = intFromRex(fetch);
-        Integer off = intFromRex(offset);
+        double inputRowCount = mq.getRowCount(getInput());
 
-        if (lim == null) {
-            // If estimated rowcount is less than offset return 0
-            if (off == null)
-                return getInput().estimateRowCount(mq) * 0.01;
+        double lim = fetch != null ? doubleFromRex(fetch, inputRowCount * FETCH_IS_PARAM_FACTOR) : inputRowCount;
+        double off = offset != null ? doubleFromRex(offset, inputRowCount * OFFSET_IS_PARAM_FACTOR) : 0;
 
-            return Math.max(0, getInput().estimateRowCount(mq) - off);
-        }
-        else {
-            // probably we can process DYNAMIC_PARAM here too.
-            if (off != null)
-                return lim + off;
-            else
-                return lim / 2.;
-        }
+        return Math.min(lim, inputRowCount - off);
     }
 
     /**
      * @return Integer value of the literal expression.
      */
-    private Integer intFromRex(RexNode n) {
+    private double doubleFromRex(RexNode n, double def) {
         try {
             if (n.isA(SqlKind.LITERAL))
                 return ((RexLiteral)n).getValueAs(Integer.class);
             else
-                return null;
+                return def;
         }
         catch (Exception e) {
-            return null;
+            assert false : "Unable to extract value: " + e.getMessage();
+
+            return def;
         }
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java
index d8490bc..808301c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMapAggregate.java
@@ -19,16 +19,20 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -86,6 +90,15 @@ public class IgniteMapAggregate extends Aggregate implements IgniteRel {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        double rows = mq.getRowCount(getInput());
+
+        // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
+        // currently it's OK to have such a dummy cost because there is no other options
+        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
+    }
+
     /** */
     public static RelDataType rowType(RelDataTypeFactory typeFactory) {
         assert typeFactory instanceof IgniteTypeFactory;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index ad9bc2c..11a7216 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -34,51 +33,26 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelNodes;
-import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
-import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
-import static org.apache.calcite.rel.core.JoinRelType.INNER;
-import static org.apache.calcite.rel.core.JoinRelType.LEFT;
-import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
-import static org.apache.calcite.util.NumberUtil.multiply;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
-import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 
 /** */
-public class IgniteMergeJoin extends Join implements TraitsAwareIgniteRel {
+public class IgniteMergeJoin extends AbstractIgniteJoin {
     /** */
     public IgniteMergeJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right,
         RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
-        super(cluster, traitSet, ImmutableList.of(), left, right, condition, variablesSet, joinType);
+        super(cluster, traitSet, left, right, condition, variablesSet, joinType);
     }
 
     /** */
@@ -109,16 +83,6 @@ public class IgniteMergeJoin extends Join implements TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public RelWriter explainTerms(RelWriter pw) {
-        return super.explainTerms(pw)
-            .itemIf(
-                "variablesSet",
-                Commons.transform(variablesSet.asList(), CorrelationId::getId),
-                pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES
-            );
-    }
-
-    /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
@@ -166,133 +130,6 @@ public class IgniteMergeJoin extends Join implements TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        // The node is rewindable only if both sources are rewindable.
-
-        RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
-
-        RewindabilityTrait leftRewindability = TraitUtils.rewindability(left);
-        RewindabilityTrait rightRewindability = TraitUtils.rewindability(right);
-
-        RelTraitSet outTraits, leftTraits, rightTraits;
-
-        if (leftRewindability.rewindable() && rightRewindability.rewindable()) {
-            outTraits = nodeTraits.replace(RewindabilityTrait.REWINDABLE);
-            leftTraits = left.replace(RewindabilityTrait.REWINDABLE);
-            rightTraits = right.replace(RewindabilityTrait.REWINDABLE);
-        }
-        else {
-            outTraits = nodeTraits.replace(RewindabilityTrait.ONE_WAY);
-            leftTraits = left.replace(RewindabilityTrait.ONE_WAY);
-            rightTraits = right.replace(RewindabilityTrait.ONE_WAY);
-        }
-
-        return ImmutableList.of(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        // Tere are several rules:
-        // 1) any join is possible on broadcast or single distribution
-        // 2) hash distributed join is possible when join keys equal to source distribution keys
-        // 3) hash and broadcast distributed tables can be joined when join keys equal to hash
-        //    distributed table distribution keys and:
-        //      3.1) it's a left join and a hash distributed table is at left
-        //      3.2) it's a right join and a hash distributed table is at right
-        //      3.3) it's an inner join, this case a hash distributed table may be at any side
-
-        RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
-
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        IgniteDistribution leftDistr = TraitUtils.distribution(left);
-        IgniteDistribution rightDistr = TraitUtils.distribution(right);
-
-        RelTraitSet outTraits, leftTraits, rightTraits;
-
-        if (leftDistr == broadcast() || rightDistr == broadcast()) {
-            outTraits = nodeTraits.replace(broadcast());
-            leftTraits = left.replace(broadcast());
-            rightTraits = right.replace(broadcast());
-
-            res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-        }
-
-        if (leftDistr == single() || rightDistr == single()) {
-            outTraits = nodeTraits.replace(single());
-            leftTraits = left.replace(single());
-            rightTraits = right.replace(single());
-
-            res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-        }
-
-        if (!F.isEmpty(joinInfo.pairs())) {
-            Set<DistributionFunction> functions = new HashSet<>();
-
-            if (leftDistr.getType() == HASH_DISTRIBUTED
-                && Objects.equals(joinInfo.leftKeys, leftDistr.getKeys()))
-                functions.add(leftDistr.function());
-
-            if (rightDistr.getType() == HASH_DISTRIBUTED
-                && Objects.equals(joinInfo.rightKeys, rightDistr.getKeys()))
-                functions.add(rightDistr.function());
-
-            functions.add(DistributionFunction.hash());
-
-            for (DistributionFunction function : functions) {
-                leftTraits = left.replace(hash(joinInfo.leftKeys, function));
-                rightTraits = right.replace(hash(joinInfo.rightKeys, function));
-
-                // TODO distribution multitrait support
-                outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, function));
-                res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-
-                outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, function));
-                res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-
-                if (joinType == INNER || joinType == LEFT) {
-                    outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, function));
-                    leftTraits = left.replace(hash(joinInfo.leftKeys, function));
-                    rightTraits = right.replace(broadcast());
-
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-                }
-
-                if (joinType == INNER || joinType == RIGHT) {
-                    outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, function));
-                    leftTraits = left.replace(broadcast());
-                    rightTraits = right.replace(hash(joinInfo.rightKeys, function));
-
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-                }
-            }
-        }
-
-        if (!res.isEmpty())
-            return ImmutableList.copyOf(res);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()),
-            ImmutableList.of(left.replace(single()), right.replace(single()))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        // left correlations
-        Set<CorrelationId> corrIds = new HashSet<>(TraitUtils.correlation(inTraits.get(0)).correlationIds());
-        // right correlations
-        corrIds.addAll(TraitUtils.correlation(inTraits.get(1)).correlationIds());
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)), inTraits));
-    }
-
-    /** {@inheritDoc} */
     @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
@@ -371,147 +208,23 @@ public class IgniteMergeJoin extends Join implements TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        // Tere are several rules:
-        // 1) any join is possible on broadcast or single distribution
-        // 2) hash distributed join is possible when join keys equal to source distribution keys
-        // 3) hash and broadcast distributed tables can be joined when join keys equal to hash
-        //    distributed table distribution keys and:
-        //      3.1) it's a left join and a hash distributed table is at left
-        //      3.2) it's a right join and a hash distributed table is at right
-        //      3.3) it's an inner join, this case a hash distributed table may be at any side
-
-        RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        RelDistribution.Type distrType = distribution.getType();
-        switch (distrType) {
-            case BROADCAST_DISTRIBUTED:
-            case SINGLETON:
-                return Pair.of(nodeTraits, Commons.transform(inputTraits, t -> t.replace(distribution)));
-
-            case HASH_DISTRIBUTED:
-            case RANDOM_DISTRIBUTED:
-                // Such join may be replaced as a cross join with a filter uppon it.
-                // It's impossible to get random or hash distribution from a cross join.
-                if (F.isEmpty(joinInfo.pairs()))
-                    break;
-
-                // We cannot provide random distribution without unique constrain on join keys,
-                // so, we require hash distribution (wich satisfies random distribution) instead.
-                DistributionFunction function = distrType == HASH_DISTRIBUTED
-                    ? distribution.function()
-                    : DistributionFunction.hash();
-
-                IgniteDistribution outDistr = hash(joinInfo.leftKeys, function);
-
-                if (distrType != HASH_DISTRIBUTED || outDistr.satisfies(distribution)) {
-                    return Pair.of(nodeTraits.replace(outDistr),
-                        ImmutableList.of(left.replace(outDistr), right.replace(hash(joinInfo.rightKeys, function))));
-                }
-
-            default:
-                // NO-OP
-        }
-
-        return Pair.of(nodeTraits.replace(single()), Commons.transform(inputTraits, t -> t.replace(single())));
-    }
-
-    /** {@inheritDoc} */
-    @Override public double estimateRowCount(RelMetadataQuery mq) {
-        @Nullable Double result = null;
-        boolean finished = false;
-        if (!getJoinType().projectsRight()) {
-          // Create a RexNode representing the selectivity of the
-          // semijoin filter and pass it to getSelectivity
-          RexNode semiJoinSelectivity =
-              RelMdUtil.makeSemiJoinSelectivityRexNode(mq, this);
-
-            result = multiply(mq.getSelectivity(getLeft(), semiJoinSelectivity),
-                mq.getRowCount(getLeft()));
-        }
-        else { // Row count estimates of 0 will be rounded up to 1.
-               // So, use maxRowCount where the product is very small.
-            final Double left1 = mq.getRowCount(getLeft());
-            final Double right1 = mq.getRowCount(getRight());
-            if (left1 != null && right1 != null) {
-                if (left1 <= 1D || right1 <= 1D) {
-                    Double max = mq.getMaxRowCount(this);
-                    if (max != null && max <= 1D) {
-                        result = max;
-                        finished = true;
-                    }
-                }
-                if (!finished) {
-                    JoinInfo joinInfo1 = analyzeCondition();
-                    ImmutableIntList leftKeys = joinInfo1.leftKeys;
-                    ImmutableIntList rightKeys = joinInfo1.rightKeys;
-                    double selectivity = mq.getSelectivity(this, getCondition());
-                    if (F.isEmpty(leftKeys) || F.isEmpty(rightKeys)) {
-                        result = left1 * right1 * selectivity;
-                    }
-                    else {
-                        double leftDistinct = Util.first(
-                            mq.getDistinctRowCount(getLeft(), ImmutableBitSet.of(leftKeys), null), left1);
-                        double rightDistinct = Util.first(
-                            mq.getDistinctRowCount(getRight(), ImmutableBitSet.of(rightKeys), null), right1);
-                        double leftCardinality = leftDistinct / left1;
-                        double rightCardinality = rightDistinct / right1;
-                        double rowsCount = (Math.min(left1, right1) / (leftCardinality * rightCardinality)) * selectivity;
-                        JoinRelType type = getJoinType();
-                        if (type == LEFT)
-                            rowsCount += left1;
-                        else if (type == RIGHT)
-                            rowsCount += right1;
-                        else if (type == JoinRelType.FULL)
-                            rowsCount += left1 + right1;
-                        result = rowsCount;
-                    }
-                }
-            }
-        }
-
-        return Util.first(result, 1D);
-    }
-
-    /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rowCount = mq.getRowCount(this);
-
-        // Joins can be flipped, and for many algorithms, both versions are viable
-        // and have the same cost. To make the results stable between versions of
-        // the planner, make one of the versions slightly more expensive.
-        switch (joinType) {
-            case SEMI:
-            case ANTI:
-                // SEMI and ANTI join cannot be flipped
-                break;
-            case RIGHT:
-                rowCount = RelMdUtil.addEpsilon(rowCount);
-                break;
-            default:
-                if (RelNodes.COMPARATOR.compare(left, right) > 0)
-                    rowCount = RelMdUtil.addEpsilon(rowCount);
-        }
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double leftCount = mq.getRowCount(getLeft());
 
-        final double rightRowCount = right.estimateRowCount(mq);
-        final double leftRowCount = left.estimateRowCount(mq);
+        if (Double.isInfinite(leftCount))
+            return costFactory.makeInfiniteCost();
 
-        if (Double.isInfinite(leftRowCount))
-            rowCount = leftRowCount;
-        if (Double.isInfinite(rightRowCount))
-            rowCount = rightRowCount;
+        double rightCount = mq.getRowCount(getRight());
 
-        RelDistribution.Type type = distribution().getType();
+        if (Double.isInfinite(rightCount))
+            return costFactory.makeInfiniteCost();
 
-        if (type == BROADCAST_DISTRIBUTED || type == SINGLETON)
-            rowCount = RelMdUtil.addEpsilon(rowCount);
+        double rows = leftCount + rightCount;
 
-        return planner.getCostFactory().makeCost(rowCount, 0, 0);
+        return costFactory.makeCost(rows,
+            rows * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST), 0);
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java
index fb204f8..8614ece 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java
@@ -17,11 +17,9 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
@@ -34,9 +32,8 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
@@ -47,7 +44,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
  * The set of output rows is a subset of the cartesian product of the two
  * inputs; precisely which subset depends on the join condition.
  */
-public class IgniteNestedLoopJoin extends AbstractIgniteNestedLoopJoin {
+public class IgniteNestedLoopJoin extends AbstractIgniteJoin {
     /**
      * Creates a Join.
      *
@@ -78,7 +75,24 @@ public class IgniteNestedLoopJoin extends AbstractIgniteNestedLoopJoin {
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        return super.computeSelfCost(planner, mq).multiplyBy(10);
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double leftCount = mq.getRowCount(getLeft());
+
+        if (Double.isInfinite(leftCount))
+            return costFactory.makeInfiniteCost();
+
+        double rightCount = mq.getRowCount(getRight());
+
+        if (Double.isInfinite(rightCount))
+            return costFactory.makeInfiniteCost();
+
+        double rows = leftCount * rightCount;
+
+        double rightSize = rightCount * getRight().getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+
+        return costFactory.makeCost(rows,
+            rows * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST), 0, rightSize, 0);
     }
 
     /** {@inheritDoc} */
@@ -92,18 +106,6 @@ public class IgniteNestedLoopJoin extends AbstractIgniteNestedLoopJoin {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        // left correlations
-        Set<CorrelationId> corrIds = new HashSet<>(TraitUtils.correlation(inTraits.get(0)).correlationIds());
-        // right correlations
-        corrIds.addAll(TraitUtils.correlation(inTraits.get(1)).correlationIds());
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteNestedLoopJoin(cluster, getTraitSet(), inputs.get(0), inputs.get(1), getCondition(), getVariablesSet(), getJoinType());
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index b28048d..e0aa485 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -36,7 +36,6 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexInputRef;
@@ -44,6 +43,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
@@ -210,9 +210,9 @@ public class IgniteProject extends Project implements TraitsAwareIgniteRel {
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rowCount = mq.getRowCount(getInput()) * exps.size();
-        rowCount = RelMdUtil.addEpsilon(rowCount); // to differ from rel nodes with integrated projection
-        return planner.getCostFactory().makeCost(rowCount, 0, 0);
+        double rowCount = mq.getRowCount(getInput());
+
+        return planner.getCostFactory().makeCost(rowCount, rowCount * IgniteCost.ROW_PASS_THROUGH_COST, 0);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java
index 356c3df..1e4f88b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReduceAggregate.java
@@ -21,6 +21,8 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelInput;
@@ -29,10 +31,12 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.core.Aggregate.Group;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
@@ -106,14 +110,26 @@ public class IgniteReduceAggregate extends SingleRel implements IgniteRel {
         return pw;
     }
 
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        double rows = mq.getRowCount(getInput());
+
+        // TODO: fix it when https://issues.apache.org/jira/browse/IGNITE-13543 will be resolved
+        // currently it's OK to have such a dummy cost because there is no other options
+        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
+    }
+
+    /** */
     public ImmutableBitSet groupSet() {
         return groupSet;
     }
 
+    /** */
     public List<ImmutableBitSet> groupSets() {
         return groupSets;
     }
 
+    /** */
     public List<AggregateCall> aggregateCalls() {
         return aggCalls;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
index 838574d..964204b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
@@ -19,14 +19,20 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
 import java.util.List;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 
 import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
@@ -100,6 +106,18 @@ public class IgniteSort extends Sort implements IgniteRel {
     }
 
     /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        double rows = mq.getRowCount(getInput());
+
+        double cost = rows * IgniteCost.ROW_PASS_THROUGH_COST + Util.nLogN(rows) * IgniteCost.ROW_COMPARISON_COST;
+        double memory = rows * getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        return costFactory.makeCost(rows, cost, 0, memory, 0);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
index e3a3b4d..da8c441 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -25,8 +26,9 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Spool;
-import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 
 import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
@@ -78,9 +80,13 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        // TODO: add memory usage to cost
-        double rowCount = mq.getRowCount(this);
-        rowCount = RelMdUtil.addEpsilon(rowCount);
-        return planner.getCostFactory().makeCost(rowCount, 0, 0);
+        double rowCount = mq.getRowCount(getInput());
+        double bytesPerRow = getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+        double totalBytes = rowCount * bytesPerRow;
+
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        return costFactory.makeCost(rowCount,
+            rowCount * IgniteCost.ROW_PASS_THROUGH_COST, 0, totalBytes, 0);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
index 4bd3a17..be16129 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -28,6 +29,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
@@ -61,7 +63,12 @@ public class IgniteTrimExchange extends Exchange implements SourceAwareIgniteRel
     /** */
     public IgniteTrimExchange(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
-        sourceId = ((Number)input.get("sourceId")).longValue();
+
+        Object srcIdObj = input.get("sourceId");
+        if (srcIdObj != null)
+            sourceId = ((Number)srcIdObj).longValue();
+        else
+            sourceId = -1;
     }
 
     /** {@inheritDoc} */
@@ -87,8 +94,10 @@ public class IgniteTrimExchange extends Exchange implements SourceAwareIgniteRel
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rowCount = mq.getRowCount(this);
-        return planner.getCostFactory().makeCost(rowCount, rowCount, 0);
+        double rowCount = mq.getRowCount(getInput());
+
+        return planner.getCostFactory().makeCost(rowCount,
+            rowCount * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST), 0);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java
index e05f59b..36e5b2d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java
@@ -23,6 +23,8 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
@@ -30,7 +32,9 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
@@ -129,6 +133,13 @@ public class IgniteUnionAll extends Union implements TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        double rows = mq.getRowCount(this);
+
+        return planner.getCostFactory().makeCost(rows, rows * IgniteCost.ROW_PASS_THROUGH_COST, 0);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteUnionAll(cluster, getTraitSet(), Commons.cast(inputs));
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
index abc96e9..f484558 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
@@ -40,6 +40,7 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -122,12 +123,13 @@ public abstract class ProjectableFilterableTableScan extends TableScan {
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double estimated = estimateRowCount(mq);
+        double rows = table.getRowCount();
+        double cost = rows * IgniteCost.ROW_PASS_THROUGH_COST;
 
-        if (projects != null)
-            estimated += estimated * projects.size();
+        if (condition != null)
+            cost += rows * IgniteCost.ROW_COMPARISON_COST;
 
-        return planner.getCostFactory().makeCost(estimated, 0, 0);
+        return planner.getCostFactory().makeCost(rows, cost, 0);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java
index ea9c789..35e7d33 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsAnyProject;
 import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsAnyScan;
 import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsIndexScan;
 import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsSubPlan;
@@ -120,10 +121,11 @@ public class CalciteBasicSecondaryIndexIntegrationTest extends GridCommonAbstrac
             .check();
     }
 
+    /** */
     @Test
     public void testMergeJoin() {
         assertQuery("" +
-            "SELECT d1.name, d2.name FROM Developer d1, Developer d2 WHERE d1.depId = d2.depId")
+            "SELECT /*+ DISABLE_RULE('CorrelatedNestedLoopJoin') */ d1.name, d2.name FROM Developer d1, Developer d2 WHERE d1.depId = d2.depId")
             .matches(containsSubPlan("IgniteMergeJoin"))
             .returns("Bach", "Bach")
             .returns("Beethoven", "Beethoven")
@@ -530,7 +532,7 @@ public class CalciteBasicSecondaryIndexIntegrationTest extends GridCommonAbstrac
                 containsIndexScan("PUBLIC", "DEVELOPER", NAME_CITY_IDX),
                 containsIndexScan("PUBLIC", "DEVELOPER", NAME_DEPID_CITY_IDX))
             )
-            .matches(containsTableScan("PUBLIC", "DEVELOPER"))
+            .matches(containsAnyScan("PUBLIC", "DEVELOPER"))
             .returns(1, "Mozart", 3, "Vienna", 33)
             .returns(3, "Bach", 1, "Leipzig", 55)
             .check();
@@ -619,7 +621,8 @@ public class CalciteBasicSecondaryIndexIntegrationTest extends GridCommonAbstrac
     @Test
     public void testOrderByNameCityAsc() {
         assertQuery("SELECT * FROM Developer ORDER BY name, city")
-            .matches(containsTableScan("PUBLIC", "DEVELOPER"))
+            .matches(containsAnyScan("PUBLIC", "DEVELOPER"))
+            .matches(containsAnyScan("PUBLIC", "DEVELOPER"))
             .matches(containsSubPlan("IgniteSort"))
             .returns(3, "Bach", 1, "Leipzig", 55)
             .returns(2, "Beethoven", 2, "Vienna", 44)
@@ -647,7 +650,7 @@ public class CalciteBasicSecondaryIndexIntegrationTest extends GridCommonAbstrac
     @Test
     public void testOrderByNoIndexedColumn() {
         assertQuery("SELECT * FROM Developer ORDER BY age DESC")
-            .matches(containsTableScan("PUBLIC", "DEVELOPER"))
+            .matches(containsAnyProject("PUBLIC", "DEVELOPER"))
             .matches(containsSubPlan("IgniteSort"))
             .returns(4, "Strauss", 2, "Munich", 66)
             .returns(3, "Bach", 1, "Leipzig", 55)
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 29753c0..3f7e01a 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -21,7 +21,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
+
 import com.google.common.collect.ImmutableMap;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -99,26 +99,28 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         Map<Integer, RISK> mRisk = new HashMap<>(65000);
 
         for (int i = 0; i < 65000; i++)
-            mRisk.put(1, new RISK(i));
+            mRisk.put(i, new RISK(i));
 
         RISK.putAll(mRisk);
 
         Map<Integer, TRADE> mTrade = new HashMap<>(200);
 
         for (int i = 0; i < 200; i++)
-            mTrade.put(1, new TRADE(i));
+            mTrade.put(i, new TRADE(i));
 
         TRADE.putAll(mTrade);
 
         for (int i = 0; i < 80; i++)
-            BATCH.put(1, new BATCH(i));
+            BATCH.put(i, new BATCH(i));
 
         awaitPartitionMapExchange(true, true, null);
 
-        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+        QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class);
 
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-13849
+        // we have a problem with serialization/deserialization of MergeJoin
         List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
-            "SELECT count(*)" +
+            "SELECT /*+ DISABLE_RULE('MergeJoinConverter') */ count(*)" +
                 " FROM RISK R," +
                 " TRADE T," +
                 " BATCH B " +
@@ -128,7 +130,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
                 "AND T.BOOK = 'BOOK' " +
                 "AND B.IS = TRUE");
 
-        System.err.println(query.get(0).getAll());
+        List<List<?>> res = query.get(0).getAll();
+
+        assertEquals(1, res.size());
+        assertEquals(1, res.get(0).size());
+        assertEquals(40L, res.get(0).get(0));
     }
 
     /** */
@@ -171,7 +177,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         public TRADE(Integer in) {
             TRADEID = in;
             TRADEVER = in;
-            BOOK = ThreadLocalRandom.current().nextBoolean() ? "BOOK" : "";
+            BOOK = (in & 1) != 0 ? "BOOK" : "";
         }
     }
 
@@ -188,7 +194,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         /** */
         public BATCH(Integer in) {
             BATCHKEY = in;
-            IS = ThreadLocalRandom.current().nextBoolean();
+            IS = (in & 1) != 0;
         }
     }
 
@@ -539,6 +545,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     }
 
     /** */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-13849")
     @Test
     public void query() throws Exception {
         IgniteCache<Integer, Developer> developer = grid(1).createCache(new CacheConfiguration<Integer, Developer>()
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index d95c9bb..7f8a4b8 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.query.calcite.message.MessageServic
 import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
 import org.apache.ignite.internal.processors.query.calcite.metadata.CollocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
@@ -2604,6 +2605,7 @@ public class PlannerTest extends GridCommonAbstractTest {
             .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
                 .defaultSchema(schema)
                 .traitDefs(traitDefs)
+                .costFactory(new IgniteCostFactory(1, 100, 1, 1))
                 .build())
             .logger(log)
             .query(sql)
@@ -2721,7 +2723,7 @@ public class PlannerTest extends GridCommonAbstractTest {
                 .add("ID", f.createJavaType(Integer.class))
                 .add("NAME", f.createJavaType(String.class))
                 .add("DEPTNO", f.createJavaType(Integer.class))
-                .build()) {
+                .build(), RewindabilityTrait.REWINDABLE, 1000) {
 
             @Override public IgniteDistribution distribution() {
                 return IgniteDistributions.broadcast();
@@ -2734,7 +2736,7 @@ public class PlannerTest extends GridCommonAbstractTest {
             new RelDataTypeFactory.Builder(f)
                 .add("DEPTNO", f.createJavaType(Integer.class))
                 .add("NAME", f.createJavaType(String.class))
-                .build()) {
+                .build(), RewindabilityTrait.REWINDABLE, 100) {
 
             @Override public IgniteDistribution distribution() {
                 return IgniteDistributions.broadcast();
@@ -2748,19 +2750,17 @@ public class PlannerTest extends GridCommonAbstractTest {
         publicSchema.addTable("EMP", emp);
         publicSchema.addTable("DEPT", dept);
 
-        SchemaPlus schema = createRootSchema(false)
-            .add("PUBLIC", publicSchema);
-
         String sql = "select * from dept d join emp e on d.deptno = e.deptno and e.name >= d.name order by e.name, d.deptno";
 
-        RelNode phys = physicalPlan(sql, publicSchema);
+        RelNode phys = physicalPlan(sql, publicSchema, "CorrelatedNestedLoopJoin");
 
         assertNotNull(phys);
         assertEquals("" +
-                "IgniteSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])\n" +
-                "  IgniteNestedLoopJoin(condition=[AND(=($0, $4), >=($3, $1))], joinType=[inner])\n" +
-                "    IgniteTableScan(table=[[PUBLIC, DEPT]])\n" +
-                "    IgniteTableScan(table=[[PUBLIC, EMP]])\n",
+                "IgniteProject(DEPTNO=[$3], NAME=[$4], ID=[$0], NAME0=[$1], DEPTNO0=[$2])\n" +
+                "  IgniteSort(sort0=[$1], sort1=[$3], dir0=[ASC], dir1=[ASC])\n" +
+                "    IgniteNestedLoopJoin(condition=[AND(=($3, $2), >=($1, $4))], joinType=[inner])\n" +
+                "      IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n" +
+                "      IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n",
             RelOptUtil.toString(phys));
     }
 
@@ -3106,8 +3106,6 @@ public class PlannerTest extends GridCommonAbstractTest {
             protoType = RelDataTypeImpl.proto(type);
             this.rewindable = rewindable;
             this.rowCnt = rowCnt;
-
-            addIndex(new IgniteIndex(RelCollations.of(), "PK", null, this));
         }
 
         /** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index d049949..e687054 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -77,17 +77,6 @@ public abstract class QueryChecker {
     }
 
     /**
-     * Ignite table|index scan with projects matcher.
-     *
-     * @param schema  Schema name.
-     * @param tblName Table name.
-     * @return Matcher.
-     */
-    public static Matcher<String> containsAnyProject(String schema, String tblName) {
-        return containsSubPlan("Scan(table=[[" + schema + ", " + tblName + "]], " + "requiredColunms=");
-    }
-
-    /**
      * Ignite table|index scan with projects unmatcher.
      *
      * @param schema  Schema name.
@@ -140,6 +129,18 @@ public abstract class QueryChecker {
     }
 
     /**
+     * Ignite table|index scan with any project matcher.
+     *
+     * @param schema  Schema name.
+     * @param tblName Table name.
+     * @return Matcher.
+     */
+    public static Matcher<String> containsAnyProject(String schema, String tblName) {
+        return matchesOnce(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema + ", " +
+            tblName + "\\]\\], .* requiredColumns=\\[\\{(\\d|\\W|,)+\\}\\].*");
+    }
+
+    /**
      * Sub plan matcher.
      *
      * @param subPlan  Subplan.
@@ -203,8 +204,12 @@ public abstract class QueryChecker {
      * @return Matcher.
      */
     public static Matcher<String> containsAnyScan(final String schema, final String tblName, String... idxNames) {
+        if (F.isEmpty(idxNames))
+            return matchesOnce(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema + ", " + tblName + "\\]\\].*");
+
         return CoreMatchers.anyOf(
-            Arrays.stream(idxNames).map(idx -> containsIndexScan(schema, tblName, idx)).collect(Collectors.toList()));
+            Arrays.stream(idxNames).map(idx -> containsIndexScan(schema, tblName, idx)).collect(Collectors.toList())
+        );
     }
 
     /** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
index 783d23e..cf08e6a 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsAnyProject;
 import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsIndexScan;
 import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsOneProject;
 import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsProject;
@@ -134,26 +135,26 @@ public class ProjectScanMergeRuleTest extends GridCommonAbstractTest {
     @Test
     public void testNestedProjects() {
         checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT CAT_ID FROM products WHERE CAT_ID > 1) and ID > 2;")
-            .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+            .matches(containsAnyProject("PUBLIC", "PRODUCTS"))
             .returns("noname3")
             .returns("noname4")
             .check();
 
         checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT DISTINCT CAT_ID FROM products WHERE CAT_ID > 1)")
-            .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+            .matches(containsAnyProject("PUBLIC", "PRODUCTS"))
             .returns("noname2")
             .returns("noname3")
             .returns("noname4")
             .check();
 
         checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT DISTINCT CAT_ID FROM products WHERE SUBCAT_ID > 11)")
-            .matches(containsTableScan("PUBLIC", "PRODUCTS"))
+            .matches(containsAnyProject("PUBLIC", "PRODUCTS"))
             .returns("noname3")
             .returns("noname4")
             .check();
 
         checkQuery("SELECT NAME FROM products WHERE CAT_ID = (SELECT CAT_ID FROM products WHERE SUBCAT_ID = 13)")
-            .matches(containsTableScan("PUBLIC", "PRODUCTS"))
+            .matches(containsAnyProject("PUBLIC", "PRODUCTS"))
             .returns("noname4")
             .check();
     }