You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:51:46 UTC

[02/50] [abbrv] ignite git commit: ignite-split2 - refactor

ignite-split2 - refactor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f5c63701
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f5c63701
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f5c63701

Branch: refs/heads/ignite-1232
Commit: f5c63701da73d2b3405d1d24ebffc2e6eabfca12
Parents: 4e16d0d
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 4 07:53:25 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 4 07:53:25 2015 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2Collocation.java         | 736 +++++++++++++++++++
 .../query/h2/opt/GridH2IndexBase.java           | 371 +---------
 .../query/h2/opt/GridH2QueryContext.java        |  20 +-
 .../processors/query/h2/opt/GridH2Table.java    |   9 +-
 .../h2/opt/GridH2TableFilterCollocation.java    |  64 --
 .../query/h2/opt/GridH2TreeIndex.java           |   2 +-
 .../query/h2/sql/GridSqlQuerySplitter.java      |   2 +-
 7 files changed, 767 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f5c63701/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Collocation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Collocation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Collocation.java
new file mode 100644
index 0000000..fd72b2b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Collocation.java
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.h2.command.dml.Query;
+import org.h2.command.dml.Select;
+import org.h2.command.dml.SelectUnion;
+import org.h2.expression.Expression;
+import org.h2.expression.ExpressionColumn;
+import org.h2.index.IndexCondition;
+import org.h2.index.ViewIndex;
+import org.h2.table.Column;
+import org.h2.table.SubQueryInfo;
+import org.h2.table.Table;
+import org.h2.table.TableFilter;
+
+/**
+ * Collocation model for a query.
+ */
+public class GridH2Collocation {
+    /** */
+    public static final int MULTIPLIER_COLLOCATED = 1;
+
+    /** */
+    private static final int MULTIPLIER_UNICAST = 20;
+
+    /** */
+    private static final int MULTIPLIER_BROADCAST = 80;
+
+    /** */
+    private final GridH2Collocation upper;
+
+    /** */
+    private final int filter;
+
+    /** */
+    private int multiplier;
+
+    /** */
+    private Type type;
+
+    /** */
+    private GridH2Collocation[] children;
+
+    /** */
+    private TableFilter[] childFilters;
+
+    /** */
+    private boolean childsOrderFinalized;
+
+    /** */
+    private List<GridH2Collocation> unions;
+
+    /** */
+    private Select select;
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     */
+    public GridH2Collocation(GridH2Collocation upper, int filter) {
+        this.upper = upper;
+        this.filter = filter;
+    }
+
+    /**
+     * @param select Select.
+     */
+    public void select(Select select) {
+        this.select = select;
+    }
+
+    /**
+     * @return Select.
+     */
+    public Select select() {
+        return select;
+    }
+
+    /**
+     * @return List of unions.
+     */
+    public List<GridH2Collocation> unions() {
+        return unions;
+    }
+
+    /**
+     * @param unions Unions.
+     */
+    public void unions(List<GridH2Collocation> unions) {
+        this.unions = unions;
+    }
+
+    /**
+     * @param childFilters New child filters.
+     * @return {@code true} If child filters were updated.
+     */
+    public boolean childFilters(TableFilter[] childFilters) {
+        assert childFilters != null;
+        assert select == childFilters[0].getSelect();
+
+        if (Arrays.equals(this.childFilters, childFilters))
+            return false;
+
+        childsOrderFinalized = false;
+
+        if (this.childFilters == null) {
+            // We have to clone because H2 reuses array and reorders elements.
+            this.childFilters = childFilters.clone();
+
+            children = new GridH2Collocation[childFilters.length];
+        }
+        else {
+            assert this.childFilters.length == childFilters.length;
+
+            // We have to copy because H2 reuses array and reorders elements.
+            System.arraycopy(childFilters, 0, this.childFilters, 0, childFilters.length);
+
+            Arrays.fill(children, null);
+        }
+
+        reset();
+
+        return true;
+    }
+
+    /**
+     * Reset current collocation model and all the children, but do not touch union.
+     */
+    private void reset() {
+        type = null;
+        multiplier = 0;
+    }
+
+    /**
+     * @param i Index.
+     * @param f Table filter.
+     * @return {@code true} If the child is not a table or view.
+     */
+    private boolean isNotTableOrViewChild(int i, TableFilter f) {
+        if (f == null)
+            f = childFilters[i];
+
+        Table t = f.getTable();
+
+        return !t.isView() && !(t instanceof GridH2Table);
+    }
+
+    /**
+     * Do the needed calculations.
+     */
+    private void calculate() {
+        if (type != null)
+            return;
+
+        if (childFilters != null) {
+            // We are at sub-query.
+            boolean collocated = true;
+            boolean partitioned = false;
+            int maxMultiplier = 0;
+
+            for (int i = 0; i < childFilters.length; i++) {
+                GridH2Collocation c = child(i);
+
+                if (c == null) {
+                    assert isNotTableOrViewChild(i, null);
+
+                    continue;
+                }
+
+                Type t = c.type(true);
+
+                if (!t.isCollocated()) {
+                    collocated = false;
+
+                    int m = c.multiplier(true);
+
+                    if (m > maxMultiplier)
+                        maxMultiplier = m;
+                }
+
+                if (t.isPartitioned())
+                    partitioned = true;
+            }
+
+            type = Type.of(partitioned, collocated);
+            multiplier = type.isCollocated() ? MULTIPLIER_COLLOCATED : maxMultiplier;
+        }
+        else {
+            assert upper != null;
+
+            // We are at table instance.
+            GridH2Table tbl = (GridH2Table)upper.childFilters[filter].getTable();
+
+            // Only partitioned tables will do distributed joins.
+            if (!tbl.isPartitioned()) {
+                type = Type.REPLICATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+
+                return;
+            }
+
+            // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables
+            // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted
+            // to all the affinity nodes the "base" does not need to get remote results.
+            if (!upper.findPartitionedTableBefore(filter)) {
+                type = Type.PARTITIONED_COLLOCATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+
+                return;
+            }
+
+            // It is enough to make sure that our previous join by affinity key is collocated, then we are
+            // collocated. If we at least have affinity key condition, then we do unicast which is cheaper.
+            switch (upper.joinedWithCollocated(filter)) {
+                case JOINED_WITH_COLLOCATED:
+                    type = Type.PARTITIONED_COLLOCATED;
+                    multiplier = MULTIPLIER_COLLOCATED;
+
+                    break;
+
+                case HAS_AFFINITY_CONDITION:
+                    type = Type.PARTITIONED_NOT_COLLOCATED;
+                    multiplier = MULTIPLIER_UNICAST;
+
+                    break;
+
+                case NONE:
+                    type = Type.PARTITIONED_NOT_COLLOCATED;
+                    multiplier = MULTIPLIER_BROADCAST;
+
+                    break;
+            }
+        }
+    }
+
+    /**
+     * @param f Current filter.
+     * @return {@code true} If partitioned table was found.
+     */
+    private boolean findPartitionedTableBefore(int f) {
+        for (int i = 0; i < f; i++) {
+            GridH2Collocation c = child(i);
+
+            assert c != null || isNotTableOrViewChild(i, null);
+
+            // The `c` can be null if it is not a GridH2Table and not a sub-query,
+            // it is a some kind of function table or anything else that considered replicated.
+            if (c != null && c.type(true).isPartitioned())
+                return true;
+        }
+
+        // We have to search globally in upper queries as well.
+        return upper != null && upper.findPartitionedTableBefore(filter);
+    }
+
+    /**
+     * @param f Filter.
+     * @return Affinity join type.
+     */
+    private Affinity joinedWithCollocated(int f) {
+        TableFilter tf = childFilters[f];
+
+        ArrayList<IndexCondition> idxConditions = tf.getIndexConditions();
+
+        int affColId = ((GridH2Table)tf.getTable()).getAffinityKeyColumnId();
+
+        boolean affKeyConditionFound = false;
+
+        for (int i = 0; i < idxConditions.size(); i++) {
+            IndexCondition c = idxConditions.get(i);
+
+            if (c.getCompareType() == IndexCondition.EQUALITY &&
+                c.getColumn().getColumnId() == affColId && c.isEvaluatable()) {
+                affKeyConditionFound = true;
+
+                Expression exp = c.getExpression();
+
+                exp = exp.getNonAliasExpression();
+
+                if (exp instanceof ExpressionColumn) {
+                    ExpressionColumn expCol = (ExpressionColumn)exp;
+
+                    // This is one of our previous joins.
+                    TableFilter prevJoin = expCol.getTableFilter();
+
+                    if (prevJoin != null) {
+                        GridH2Collocation co = children[indexOf(prevJoin)];
+
+                        assert co != null || isNotTableOrViewChild(-1, prevJoin);
+
+                        if (co != null) {
+                            Type t = co.type(true);
+
+                            if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol))
+                                return Affinity.JOINED_WITH_COLLOCATED;
+                        }
+                    }
+                }
+            }
+        }
+
+        return affKeyConditionFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE;
+    }
+
+    /**
+     * @param f Table filter.
+     * @return Index.
+     */
+    private int indexOf(TableFilter f) {
+        for (int i = 0; i < childFilters.length; i++) {
+            if (childFilters[i] == f)
+                return i;
+        }
+
+        throw new IllegalStateException();
+    }
+
+    /**
+     * @param f Table filter.
+     * @param expCol Expression column.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol) {
+        Column col = expCol.getColumn();
+
+        if (col == null)
+            return false;
+
+        Table t = col.getTable();
+
+        if (t.isView()) {
+            Query qry = ((ViewIndex)f.getIndex()).getQuery();
+
+            return isAffinityColumn(qry, expCol);
+        }
+
+        return t instanceof GridH2Table &&
+            col.getColumnId() == ((GridH2Table)t).getAffinityKeyColumnId();
+    }
+
+    /**
+     * @param qry Query.
+     * @param expCol Expression column.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol) {
+        if (qry.isUnion()) {
+            SelectUnion union = (SelectUnion)qry;
+
+            return isAffinityColumn(union.getLeft(), expCol) && isAffinityColumn(union.getRight(), expCol);
+        }
+
+        Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
+
+        if (exp instanceof ExpressionColumn) {
+            expCol = (ExpressionColumn)exp;
+
+            return isAffinityColumn(expCol.getTableFilter(), expCol);
+        }
+
+        return false;
+    }
+
+    /**
+     * Sets table filters to the final state of query.
+     *
+     * @return {@code false} if nothing was actually done here.
+     */
+    private boolean finalizeChildFiltersOrder() {
+        if (childFilters == null || childsOrderFinalized)
+            return false;
+
+        int i = 0;
+
+        // Collect table filters in final order after optimization.
+        for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin()) {
+            childFilters[i] = f;
+
+            if (f.getTable().isView())
+                children[i].finalizeChildFiltersOrder();
+
+            i++;
+        }
+
+        assert i == childFilters.length;
+
+        reset();
+
+        childsOrderFinalized = true;
+
+        return true;
+    }
+
+    /**
+     * @return Multiplier.
+     */
+    public int calculateMultiplier() {
+        if (childFilters != null && !childsOrderFinalized) {
+            // We have to set all sub-queries structure to the final one we will have in query.
+            boolean needReset = false;
+
+            for (int i = 0; i < childFilters.length; i++) {
+                if (childFilters[i].getTable().isView() && children[i].finalizeChildFiltersOrder())
+                    needReset = true;
+            }
+
+            if (needReset)
+                reset();
+
+            childsOrderFinalized = true;
+        }
+
+        // We don't need multiplier for union here because it will be summarized in H2.
+        return multiplier(false);
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Multiplier.
+     */
+    private int multiplier(boolean withUnion) {
+        calculate();
+
+        assert multiplier != 0;
+
+        if (withUnion && unions != null) {
+            int maxMultiplier = unions.get(0).multiplier(false);
+
+            for (int i = 1; i < unions.size(); i++) {
+                int m = unions.get(i).multiplier(false);
+
+                if (m > maxMultiplier)
+                    maxMultiplier = m;
+            }
+
+            return maxMultiplier;
+        }
+
+        return multiplier;
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Type.
+     */
+    private Type type(boolean withUnion) {
+        calculate();
+
+        assert type != null;
+
+        if (withUnion && unions != null) {
+            Type left = unions.get(0).type(false);
+
+            for (int i = 1; i < unions.size(); i++) {
+                Type right = unions.get(i).type(false);
+
+                if (!left.isCollocated() || !right.isCollocated()) {
+                    left = Type.PARTITIONED_NOT_COLLOCATED;
+
+                    break;
+                }
+                else if (!left.isPartitioned() && !right.isPartitioned())
+                    left = Type.REPLICATED;
+                else
+                    left = Type.PARTITIONED_COLLOCATED;
+            }
+
+            return left;
+        }
+
+        return type;
+    }
+
+    /**
+     * @param idx Index.
+     * @param child Child collocation.
+     */
+    public void child(int idx, GridH2Collocation child) {
+        assert child(idx) == null;
+
+        children[idx] = child;
+    }
+
+    /**
+     * @param idx Index.
+     * @return Child collocation.
+     */
+    public GridH2Collocation child(int idx) {
+        return children[idx];
+    }
+
+    /**
+     * @return Upper collocation.
+     */
+    public GridH2Collocation upper() {
+        return upper;
+    }
+
+    /**
+     * @return Filter.
+     */
+    public int filter() {
+        return filter;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param info Sub-query info.
+     * @param filters Filters.
+     * @param filter Filter.
+     * @return Collocation.
+     */
+    public static GridH2Collocation buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
+        TableFilter[] filters, int filter) {
+        GridH2Collocation c;
+
+        if (info != null) {
+            // Go up until we reach the root query.
+            c = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter());
+        }
+        else {
+            // We are at the root query.
+            c = qctx.queryCollocation();
+
+            if (c == null) {
+                c = new GridH2Collocation(null, -1);
+
+                c.select(filters[0].getSelect());
+
+                qctx.queryCollocation(c);
+            }
+        }
+
+        // Handle union. We have to rely on fact that select will be the same on uppermost select.
+        // For sub-queries we will drop collocation models, so that they will be recalculated anyways.
+        Select select = filters[0].getSelect();
+
+        if (c.select() != select) {
+            List<GridH2Collocation> unions = c.unions();
+
+            int i = 1;
+
+            if (unions == null) {
+                unions = new ArrayList<>();
+
+                unions.add(c);
+                c.unions(unions);
+            }
+            else {
+                for (; i < unions.size(); i++) {
+                    GridH2Collocation u = unions.get(i);
+
+                    if (u.select() == select) {
+                        c = u;
+
+                        break;
+                    }
+                }
+            }
+
+            if (i == unions.size()) {
+                c = new GridH2Collocation(c.upper(), c.filter());
+
+                unions.add(c);
+
+                c.select(select);
+                c.unions(unions);
+            }
+        }
+
+        c.childFilters(filters);
+
+        GridH2Collocation child = c.child(filter);
+
+        if (child == null) {
+            child = new GridH2Collocation(c, filter);
+
+            c.child(filter, child);
+        }
+
+        return child;
+    }
+
+    /**
+     * @param qry Query.
+     * @return {@code true} If the query is collocated.
+     */
+    public static boolean isCollocated(Query qry) {
+        return buildCollocationModel(null, -1, qry, null).type(true).isCollocated();
+    }
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param qry Query.
+     * @param unions Unions.
+     * @return Built model.
+     */
+    private static GridH2Collocation buildCollocationModel(GridH2Collocation upper, int filter, Query qry,
+        List<GridH2Collocation> unions) {
+        if (qry.isUnion()) {
+            if (unions == null)
+                unions = new ArrayList<>();
+
+            SelectUnion union = (SelectUnion)qry;
+
+            GridH2Collocation a = buildCollocationModel(upper, filter, union.getLeft(), unions);
+            GridH2Collocation b = buildCollocationModel(upper, filter, union.getRight(), unions);
+
+            return a == null ? b : a;
+        }
+
+        Select select = (Select)qry;
+
+        List<TableFilter> list = new ArrayList<>();
+
+        for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin())
+            list.add(f);
+
+        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
+
+        GridH2Collocation c = new GridH2Collocation(upper, filter);
+
+        if (unions != null) {
+            unions.add(c);
+
+            c.unions(unions);
+        }
+
+        c.childFilters(filters);
+
+        if (upper != null)
+            upper.child(filter, c);
+
+        for (int i = 0; i < filters.length; i++) {
+            TableFilter f = filters[i];
+
+            if (f.getTable().isView())
+                c.child(i, buildCollocationModel(c, i, ((ViewIndex)f.getIndex()).getQuery(), null));
+            else if (f.getTable() instanceof GridH2Table)
+                c.child(i, new GridH2Collocation(c, i));
+        }
+
+        return upper == null ? c : null;
+    }
+
+    /**
+     * Collocation type.
+     */
+    private enum Type {
+        /** */
+        PARTITIONED_COLLOCATED(true, true),
+
+        /** */
+        PARTITIONED_NOT_COLLOCATED(true, false),
+
+        /** */
+        REPLICATED(false, true);
+
+        /** */
+        private final boolean partitioned;
+
+        /** */
+        private final boolean collocated;
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         */
+        Type(boolean partitioned, boolean collocated) {
+            this.partitioned = partitioned;
+            this.collocated = collocated;
+        }
+
+        /**
+         * @return {@code true} If partitioned.
+         */
+        public boolean isPartitioned() {
+            return partitioned;
+        }
+
+        /**
+         * @return {@code true} If collocated.
+         */
+        public boolean isCollocated() {
+            return collocated;
+        }
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         * @return Type.
+         */
+        static Type of(boolean partitioned, boolean collocated) {
+            if (collocated)
+                return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED;
+
+            assert partitioned;
+
+            return Type.PARTITIONED_NOT_COLLOCATED;
+        }
+    }
+
+    /**
+     * Affinity of a table relative to previous joined tables.
+     */
+    private enum Affinity {
+        /** */
+        NONE,
+
+        /** */
+        HAS_AFFINITY_CONDITION,
+
+        /** */
+        JOINED_WITH_COLLOCATED
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5c63701/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index c1666fd..b063528 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,45 +25,25 @@ import org.apache.ignite.internal.util.lang.GridFilteredIterator;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.h2.command.dml.Query;
-import org.h2.command.dml.Select;
-import org.h2.command.dml.SelectUnion;
 import org.h2.engine.Session;
-import org.h2.expression.Expression;
-import org.h2.expression.ExpressionColumn;
 import org.h2.index.BaseIndex;
-import org.h2.index.IndexCondition;
 import org.h2.index.ViewIndex;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
-import org.h2.table.Column;
-import org.h2.table.Table;
 import org.h2.table.TableFilter;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2Collocation.buildCollocationModel;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2TableFilterCollocation.PARTITIONED_COLLOCATED;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2TableFilterCollocation.PARTITIONED_FIRST;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2TableFilterCollocation.PARTITIONED_NOT_COLLOCATED;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2TableFilterCollocation.REPLICATED;
 
 /**
  * Index base.
  */
 public abstract class GridH2IndexBase extends BaseIndex {
     /** */
-    private static final int MULTIPLIER_COLLOCATED = 1;
-
-    /** */
-    private static final int MULTIPLIER_UNICAST = 20;
-
-    /** */
-    private static final int MULTIPLIER_BROADCAST = 80;
-
-    /** */
     private static final AtomicLong idxIdGen = new AtomicLong();
 
     /** */
@@ -139,44 +117,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param qry Query.
-     * @return {@code true} If it was proved that the query is fully collocated.
-     */
-    public static boolean isCollocated(Query qry) {
-        if (qry.isUnion()) {
-            SelectUnion union = (SelectUnion)qry;
-
-            return isCollocated(union.getLeft()) && isCollocated(union.getRight());
-        }
-
-        Select select = (Select)qry;
-
-        ArrayList<TableFilter> list = new ArrayList<>();
-
-        TableFilter f = select.getTopTableFilter();
-
-        assert f != null;
-
-        do {
-            list.add(f);
-
-            f = f.getJoin();
-        }
-        while (f != null);
-
-        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
-
-        Map<TableFilter,GridH2TableFilterCollocation> states = new HashMap<>();
-
-        for (int i = 0; i < filters.length; i++) {
-            if (getDistributedMultiplier0(filters[i].getMasks(), filters, i, states) != MULTIPLIER_COLLOCATED)
-                return false;
-        }
-
-        return true;
-    }
-
-    /**
      * @param ses Session.
      */
     private static void clearViewIndexCache(Session ses) {
@@ -188,310 +128,30 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
     /**
      * @param ses Session.
-     * @param masks Masks.
      * @param filters All joined table filters.
      * @param filter Current filter.
      * @return Multiplier.
      */
-    public int getDistributedMultiplier(Session ses, int[] masks, TableFilter[] filters, int filter) {
+    public int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
         GridH2QueryContext qctx = GridH2QueryContext.get();
 
         // We do complex optimizations with respect to distributed joins only on prepare stage
         // because on run stage reordering of joined tables by Optimizer is explicitly disabled
         // and thus multiplier will be always the same, so it will not affect choice of index.
-        if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins())
-            return MULTIPLIER_COLLOCATED;
+        // Query expressions can not be distributed as well.
+        if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression())
+            return GridH2Collocation.MULTIPLIER_COLLOCATED;
 
-        // We have to clear this cache because normally sub-query plan cost does not depend on things
-        // other than index condition masks and sort order, but in our case it is.
+        // We have to clear this cache because normally sub-query plan cost does not depend on anything
+        // other than index condition masks and sort order, but in our case it can depend on order
+        // of previous table filters.
         clearViewIndexCache(ses);
 
-        Map<TableFilter,GridH2TableFilterCollocation> states = qctx.tableFilterStateCache();
+        assert filters != null;
 
-        // Need to do this clean up because subquery states can be outdated here.
-        clearPreviousSubQueryStates(filters, filter, states);
+        GridH2Collocation c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter);
 
-        return getDistributedMultiplier0(masks, filters, filter, states);
-    }
-
-    /**
-     * @param masks Masks.
-     * @param filters All joined table filters.
-     * @param filter Current filter.
-     * @param states States map.
-     * @return Multiplier.
-     */
-    private static int getDistributedMultiplier0(int[] masks, TableFilter[] filters, int filter,
-        Map<TableFilter,GridH2TableFilterCollocation> states) {
-        assert states != null;
-
-        final TableFilter f = filters[filter];
-
-        if (!(f.getTable() instanceof GridH2Table)) {
-            GridH2TableFilterCollocation state = getStateForNonTable(f, states);
-
-            return state.isCollocated() ? MULTIPLIER_COLLOCATED : MULTIPLIER_BROADCAST;
-        }
-
-        GridH2Table tbl = (GridH2Table)f.getTable();
-
-        // Only partitioned tables will do distributed joins.
-        if (!tbl.isPartitioned()) {
-            states.put(f, REPLICATED);
-
-            return MULTIPLIER_COLLOCATED;
-        }
-
-        // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables
-        // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted
-        // to all the affinity nodes the "base" does not need to get remote results.
-        if (!findPartitionedTableBefore(filters, filter, states)) {
-            states.put(f, PARTITIONED_FIRST);
-
-            return MULTIPLIER_COLLOCATED;
-        }
-
-        // If we don't have affinity equality conditions then most probably we will have to broadcast.
-        if (!hasEqualityCondition(masks, affinityColumn(tbl))) {
-            states.put(f, PARTITIONED_NOT_COLLOCATED);
-
-            return MULTIPLIER_BROADCAST;
-        }
-
-        // If we have an affinity condition then we have to check if the whole join chain is collocated so far.
-        if (joinedWithCollocated(f, states)) {
-            states.put(f, PARTITIONED_COLLOCATED);
-
-            return MULTIPLIER_COLLOCATED;
-        }
-
-        // We are not collocated but at least we are going to unicast.
-        states.put(f, PARTITIONED_NOT_COLLOCATED);
-
-        return MULTIPLIER_UNICAST;
-    }
-
-    /**
-     * @param f Table filter.
-     * @param states States map.
-     * @return {@code true} If the given filter is joined with previous partitioned table filter which is
-     *      also collocated. Thus the whole join chain will be collocated.
-     */
-    private static boolean joinedWithCollocated(TableFilter f, Map<TableFilter,GridH2TableFilterCollocation> states) {
-        ArrayList<IndexCondition> idxConditions = f.getIndexConditions();
-
-        int affColId = affinityColumn((GridH2Table)f.getTable());
-
-        for (int i = 0; i < idxConditions.size(); i++) {
-            IndexCondition c = idxConditions.get(i);
-
-            if (c.getCompareType() == IndexCondition.EQUALITY &&
-                c.getColumn().getColumnId() == affColId && c.isEvaluatable()) {
-                Expression exp = c.getExpression();
-
-                exp = exp.getNonAliasExpression();
-
-                if (exp instanceof ExpressionColumn) {
-                    ExpressionColumn expCol = (ExpressionColumn)exp;
-
-                    // This is one of our previous joins.
-                    TableFilter prevJoin = expCol.getTableFilter();
-
-                    if (prevJoin != null) {
-                        GridH2TableFilterCollocation state = states.get(prevJoin);
-
-                        if (state == null)
-                            state = getStateForNonTable(prevJoin, states);
-
-                        if (state.isPartitioned() && state.isCollocated() && isAffinityColumn(prevJoin, expCol))
-                            return true;
-                    }
-                }
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * @param f Table filter.
-     * @param expCol Expression column.
-     * @return {@code true} It it is an affinity column.
-     */
-    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol) {
-        Column col = expCol.getColumn();
-
-        if (col == null)
-            return false;
-
-        Table t = col.getTable();
-
-        if (t.isView()) {
-            Query qry = ((ViewIndex)f.getIndex()).getQuery();
-
-            return isAffinityColumn(qry, expCol);
-        }
-
-        return t instanceof GridH2Table &&
-            col.getColumnId() == ((GridH2Table)t).getAffinityKeyColumn().column.getColumnId();
-    }
-
-    /**
-     * @param qry Query.
-     * @param expCol Expression column.
-     * @return {@code true} It it is an affinity column.
-     */
-    private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol) {
-        if (qry.isUnion()) {
-            SelectUnion union = (SelectUnion)qry;
-
-            return isAffinityColumn(union.getLeft(), expCol) && isAffinityColumn(union.getRight(), expCol);
-        }
-
-        Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
-
-        if (exp instanceof ExpressionColumn) {
-            expCol = (ExpressionColumn)exp;
-
-            return isAffinityColumn(expCol.getTableFilter(), expCol);
-        }
-
-        return false;
-    }
-
-    /**
-     * @param filters All joined table filters.
-     * @param filter Current filter.
-     * @param states States map.
-     */
-    private static void clearPreviousSubQueryStates(TableFilter[] filters, int filter,
-        Map<TableFilter,GridH2TableFilterCollocation> states) {
-        // We have to go back and clean up state for all the previous subqueries.
-        for (int i = filter - 1; i >= 0; i--) {
-            TableFilter f = filters[i];
-
-            if (f.getTable().isView())
-                states.put(f, null);
-            else
-                break;
-        }
-    }
-
-    /**
-     * @param filters All joined table filters.
-     * @param filter Current filter.
-     * @param states States map.
-     * @return {@code true} If there are partitioned table before.
-     */
-    private static boolean findPartitionedTableBefore(TableFilter[] filters, int filter,
-        Map<TableFilter,GridH2TableFilterCollocation> states) {
-        for (int i = 0; i < filter; i++) {
-            TableFilter prevFilter = filters[i];
-
-            GridH2TableFilterCollocation state = states.get(prevFilter);
-
-            if (state == null) // This can happen if previous filter is a subquery or function.
-                state = getStateForNonTable(prevFilter, states);
-
-            if (state.isPartitioned())
-                return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @param f Filter.
-     * @param states States map.
-     * @return Filter collocation.
-     */
-    private static GridH2TableFilterCollocation getStateForNonTable(TableFilter f,
-        Map<TableFilter,GridH2TableFilterCollocation> states) {
-        Table tbl = f.getTable();
-
-        GridH2TableFilterCollocation res;
-
-        if (tbl.isView()) {
-            Query qry = ((ViewIndex)f.getIndex()).getQuery();
-
-            res = getStateForSubQuery(qry, states);
-        }
-        else if (tbl instanceof GridH2Table)
-            throw new IllegalStateException("Table found: " + ((GridH2Table)tbl).identifier());
-        else {
-            // It is a some kind of function or system table.
-            res = REPLICATED;
-        }
-
-        assert res != null;
-
-        states.put(f, res);
-
-        return res;
-    }
-
-    /**
-     * @param qry Query.
-     * @param states States.
-     * @return Collocation.
-     */
-    private static GridH2TableFilterCollocation getStateForSubQuery(Query qry,
-        Map<TableFilter,GridH2TableFilterCollocation> states) {
-        if (!qry.isUnion())
-            return getStateForSubSelect((Select)qry, states);
-
-        SelectUnion union = (SelectUnion)qry;
-
-        GridH2TableFilterCollocation left = getStateForSubQuery(union.getLeft(), states);
-        GridH2TableFilterCollocation right = getStateForSubQuery(union.getRight(), states);
-
-        if (!left.isCollocated() || !right.isCollocated())
-            return PARTITIONED_NOT_COLLOCATED;
-
-        if (!left.isPartitioned() && !right.isPartitioned())
-            return REPLICATED;
-
-        if (left == PARTITIONED_FIRST && right == PARTITIONED_FIRST)
-            return PARTITIONED_FIRST;
-
-        return PARTITIONED_COLLOCATED;
-    }
-
-    /**
-     * @param select Select.
-     * @param states States.
-     * @return Collocation.
-     */
-    private static GridH2TableFilterCollocation getStateForSubSelect(Select select,
-        Map<TableFilter,GridH2TableFilterCollocation> states) {
-        int partitioned = -1;
-        int i = 0;
-
-        for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin()) {
-            GridH2TableFilterCollocation state = states.get(f);
-
-            if (state == null)
-                state = getStateForNonTable(f, states);
-
-            if (!state.isCollocated())
-                return PARTITIONED_NOT_COLLOCATED;
-
-            if (state.isPartitioned())
-                partitioned = i;
-
-            i++;
-        }
-
-        switch (partitioned) {
-            case -1:
-                return REPLICATED;
-
-            case 0:
-                return PARTITIONED_FIRST;
-
-            default:
-                return PARTITIONED_COLLOCATED;
-        }
+        return c.calculateMultiplier();
     }
 
     /**
@@ -502,15 +162,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
         return tbl.getAffinityKeyColumn().column.getColumnId();
     }
 
-    /**
-     * @param masks Masks.
-     * @param colId Column ID.
-     * @return {@code true} If set of index conditions contains equality condition for the given column.
-     */
-    private static boolean hasEqualityCondition(int[] masks, int colId) {
-        return masks != null && (masks[colId] & IndexCondition.EQUALITY) == IndexCondition.EQUALITY;
-    }
-
     /** {@inheritDoc} */
     @Override public GridH2Table getTable() {
         return (GridH2Table)super.getTable();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5c63701/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index b5243a7..29bbbaf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -28,13 +28,11 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.h2.table.TableFilter;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 
 /**
  * Thread local SQL query context which is intended to be accessible from everywhere.
@@ -81,7 +79,7 @@ public class GridH2QueryContext {
     private int pageSize;
 
     /** */
-    private Map<TableFilter, GridH2TableFilterCollocation>  tableFilterStateCache;
+    private GridH2Collocation qryCollocation;
 
     /**
      * @param locNodeId Local node ID.
@@ -115,15 +113,17 @@ public class GridH2QueryContext {
     }
 
     /**
-     * @return Cache for table filter collocation states.
+     * @return Query collocation model.
      */
-    public Map<TableFilter, GridH2TableFilterCollocation> tableFilterStateCache() {
-        assert type() == PREPARE : type();
-
-        if (tableFilterStateCache == null)
-            tableFilterStateCache = new HashMap<>();
+    public GridH2Collocation queryCollocation() {
+        return qryCollocation;
+    }
 
-        return tableFilterStateCache;
+    /**
+     * @param qryCollocation Query collocation model.
+     */
+    public void queryCollocation(GridH2Collocation qryCollocation) {
+        this.qryCollocation = qryCollocation;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5c63701/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 774e110..17385e6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -152,6 +152,13 @@ public class GridH2Table extends TableBase {
         return affKeyCol;
     }
 
+    /**
+     * @return Affinity key column ID.
+     */
+    public int getAffinityKeyColumnId() {
+        return affKeyCol.column.getColumnId();
+    }
+
     /** {@inheritDoc} */
     @Override public long getDiskSpaceUsed() {
         return 0;
@@ -907,7 +914,7 @@ public class GridH2Table extends TableBase {
         @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter,
             SortOrder sortOrder) {
             long rows = getRowCountApproximation();
-            int mul = delegate.getDistributedMultiplier(ses, masks, filters, filter);
+            int mul = delegate.getDistributedMultiplier(ses, filters, filter);
 
             return  mul * (rows + Constants.COST_ROW_OFFSET);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5c63701/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TableFilterCollocation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TableFilterCollocation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TableFilterCollocation.java
deleted file mode 100644
index 6b11352..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TableFilterCollocation.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt;
-
-/**
- * Collocation of H2 table filter.
- */
-public enum GridH2TableFilterCollocation {
-    /** */
-    PARTITIONED_FIRST(true, true),
-
-    /** */
-    PARTITIONED_COLLOCATED(true, true),
-
-    /** */
-    PARTITIONED_NOT_COLLOCATED(true, false),
-
-    /** */
-    REPLICATED(false, true);
-
-    /** */
-    private final boolean partitioned;
-
-    /** */
-    private final boolean collocated;
-
-    /**
-     * @param partitioned Partitioned.
-     * @param collocated Collocated.
-     */
-    GridH2TableFilterCollocation(boolean partitioned, boolean collocated) {
-        this.partitioned = partitioned;
-        this.collocated = collocated;
-    }
-
-    /**
-     * @return {@code true} If partitioned.
-     */
-    public boolean isPartitioned() {
-        return partitioned;
-    }
-
-    /**
-     * @return {@code true} If collocated.
-     */
-    public boolean isCollocated() {
-        return collocated;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5c63701/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index ed00e48..633cdf4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -407,7 +407,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
         long rowCnt = getRowCountApproximation();
         double baseCost = getCostRangeIndex(masks, rowCnt, filters, filter, sortOrder);
-        int mul = getDistributedMultiplier(ses, masks, filters, filter);
+        int mul = getDistributedMultiplier(ses, filters, filter);
 
         return mul * baseCost;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5c63701/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 9225967..034eebe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -37,7 +37,7 @@ import org.h2.table.IndexColumn;
 import org.h2.util.IntArray;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase.isCollocated;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2Collocation.isCollocated;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.COUNT;