You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 09:46:49 UTC

[35/50] [abbrv] ignite git commit: ignite-1.9 - SQL related fixes and improvements: - Sorted MERGE index - EXPLAIN fixes - Replicated subqueries fixes

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 4cae6ac..8d3ece9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -86,6 +87,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
+import org.h2.index.Index;
 import org.h2.jdbc.JdbcConnection;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.jdbc.JdbcStatement;
@@ -97,6 +99,7 @@ import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
@@ -115,6 +118,12 @@ public class GridReduceQueryExecutor {
     private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.7.0");
 
     /** */
+    private static final String MERGE_INDEX_UNSORTED = "merge_scan";
+
+    /** */
+    private static final String MERGE_INDEX_SORTED = "merge_sorted";
+
+    /** */
     private GridKernalContext ctx;
 
     /** */
@@ -354,7 +363,7 @@ public class GridReduceQueryExecutor {
 
         if (msg.retry() != null)
             retry(r, msg.retry(), node.id());
-        else if (msg.allRows() != -1) // Only the first page contains row count.
+        else if (msg.page() == 0) // Do count down on each first page received.
             r.latch.countDown();
     }
 
@@ -518,7 +527,7 @@ public class GridReduceQueryExecutor {
             Map<ClusterNode, IntArray> partsMap = null;
 
             if (qry.isLocal())
-                nodes = Collections.singleton(ctx.discovery().localNode());
+                nodes = singletonList(ctx.discovery().localNode());
             else {
                 if (isPreloadingActive(cctx, extraSpaces)) {
                     if (cctx.isReplicated())
@@ -542,17 +551,17 @@ public class GridReduceQueryExecutor {
                         "We must be on a client node.";
 
                     // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
-                    nodes = Collections.singleton(F.rand(nodes));
+                    nodes = singletonList(F.rand(nodes));
                 }
             }
 
-            final Collection<ClusterNode> finalNodes = nodes;
-
             int tblIdx = 0;
 
             final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
 
-            final int segmentsPerIndex = cctx.config().getQueryParallelism();
+            final int segmentsPerIndex = qry.explain() ? 1 : cctx.config().getQueryParallelism();
+
+            int replicatedQrysCnt = 0;
 
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeIndex idx;
@@ -567,19 +576,33 @@ public class GridReduceQueryExecutor {
                         throw new IgniteException(e);
                     }
 
-                    idx = tbl.getScanIndex(null);
+                    idx = tbl.getMergeIndex();
 
                     fakeTable(r.conn, tblIdx++).innerTable(tbl);
                 }
                 else
                     idx = GridMergeIndexUnsorted.createDummy(ctx);
 
-                idx.setSources(nodes, segmentsPerIndex);
+                // If the query has only replicated tables, we have to run it on a single node only.
+                if (!mapQry.isPartitioned()) {
+                    ClusterNode node = F.rand(nodes);
+
+                    mapQry.node(node.id());
+
+                    replicatedQrysCnt++;
+
+                    idx.setSources(singletonList(node), 1); // Replicated tables can have only 1 segment.
+                }
+                else
+                    idx.setSources(nodes, segmentsPerIndex);
+
+                idx.setPageSize(r.pageSize);
 
                 r.idxs.add(idx);
             }
 
-            r.latch = new CountDownLatch(r.idxs.size() * nodes.size() * segmentsPerIndex);
+            r.latch = new CountDownLatch(
+                (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
 
             runs.put(qryReqId, r);
 
@@ -607,6 +630,8 @@ public class GridReduceQueryExecutor {
                 final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0;
                 final boolean distributedJoins = qry.distributedJoins();
 
+                final Collection<ClusterNode> finalNodes = nodes;
+
                 cancel.set(new Runnable() {
                     @Override public void run() {
                         send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -627,6 +652,9 @@ public class GridReduceQueryExecutor {
                 if (qry.isLocal())
                     flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
 
+                if (qry.explain())
+                    flags |= GridH2QueryRequest.FLAG_EXPLAIN;
+
                 if (send(nodes,
                     oldStyle ?
                         new GridQueryRequest(qryReqId,
@@ -724,7 +752,7 @@ public class GridReduceQueryExecutor {
                                 r.conn,
                                 rdc.query(),
                                 F.asList(rdc.parameters()),
-                                false,
+                                false, // The statement will cache some extra thread local objects.
                                 timeoutMillis,
                                 cancel);
 
@@ -783,19 +811,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param idxs Merge indexes.
-     * @return {@code true} If all remote data was fetched.
-     */
-    private static boolean allIndexesFetched(List<GridMergeIndex> idxs) {
-        for (int i = 0; i <  idxs.size(); i++) {
-            if (!idxs.get(i).fetchedAll())
-                return false;
-        }
-
-        return true;
-    }
-
-    /**
      * Returns true if the exception is triggered by query cancel.
      *
      * @param e Exception.
@@ -1256,6 +1271,7 @@ public class GridReduceQueryExecutor {
      * @return Table.
      * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings("unchecked")
     private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
         throws IgniteCheckedException {
         try {
@@ -1290,7 +1306,31 @@ public class GridReduceQueryExecutor {
             else
                 data.columns = planColumns();
 
-            return new GridMergeTable(data, ctx);
+            boolean sortedIndex = !F.isEmpty(qry.sortColumns());
+
+            GridMergeTable tbl = new GridMergeTable(data);
+
+            ArrayList<Index> idxs = new ArrayList<>(2);
+
+            if (explain) {
+                idxs.add(new GridMergeIndexUnsorted(ctx, tbl,
+                    sortedIndex ? MERGE_INDEX_SORTED : MERGE_INDEX_UNSORTED));
+            }
+            else if (sortedIndex) {
+                List<GridSqlSortColumn> sortCols = (List<GridSqlSortColumn>)qry.sortColumns();
+
+                GridMergeIndexSorted sortedMergeIdx = new GridMergeIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
+                    GridSqlSortColumn.toIndexColumns(tbl, sortCols));
+
+                idxs.add(GridMergeTable.createScanIndex(sortedMergeIdx));
+                idxs.add(sortedMergeIdx);
+            }
+            else
+                idxs.add(new GridMergeIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
+
+            tbl.indexes(idxs);
+
+            return tbl;
         }
         catch (Exception e) {
             U.closeQuiet(conn);
@@ -1407,22 +1447,17 @@ public class GridReduceQueryExecutor {
             while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
                 latch.countDown();
 
+            CacheException e = o instanceof CacheException ? (CacheException) o : null;
+
             for (GridMergeIndex idx : idxs) // Fail all merge indexes.
-                idx.fail(nodeId, o instanceof CacheException ? (CacheException) o : null);
+                idx.fail(nodeId, e);
         }
 
         /**
          * @param e Error.
          */
         void disconnected(CacheException e) {
-            if (!state.compareAndSet(null, e))
-                return;
-
-            while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
-                latch.countDown();
-
-            for (GridMergeIndex idx : idxs) // Fail all merge indexes.
-                idx.fail(e);
+            state(e, null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 2057330..103084e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -40,7 +40,7 @@ public class GridResultPage {
     private final UUID src;
 
     /** */
-    protected final GridQueryNextPageResponse res;
+    private final GridQueryNextPageResponse res;
 
     /** */
     private final int rowsInPage;
@@ -48,6 +48,9 @@ public class GridResultPage {
     /** */
     private Iterator<Value[]> rows;
 
+    /** */
+    private boolean last;
+
     /**
      * @param ctx Kernal context.
      * @param src Source.
@@ -119,10 +122,28 @@ public class GridResultPage {
     }
 
     /**
-     * @return {@code true} If this is a dummy last page for all the sources.
+     * @return {@code true} If this is either a real last page for a source or
+     *      a dummy terminating page with no rows.
      */
     public boolean isLast() {
-        return false;
+        return last;
+    }
+
+    /**
+     * @return {@code true} If it is a dummy last page.
+     */
+    public boolean isDummyLast() {
+        return last && res == null;
+    }
+
+    /**
+     * @param last Last page for a source.
+     * @return {@code this}.
+     */
+    public GridResultPage setLast(boolean last) {
+        this.last = last;
+
+        return this;
     }
 
     /**
@@ -153,6 +174,13 @@ public class GridResultPage {
     }
 
     /**
+     * @return Segment ID.
+     */
+    public int segmentId() {
+        return res.segmentId();
+    }
+
+    /**
      * @return Response.
      */
     public GridQueryNextPageResponse response() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 0ad534c..0aa788b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -48,17 +48,22 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      * Map query will not destroy context until explicit query cancel request will be received because distributed join
      * requests can be received.
      */
-    public static int FLAG_DISTRIBUTED_JOINS = 1;
+    public static final int FLAG_DISTRIBUTED_JOINS = 1;
 
     /**
      * Remote map query executor will enforce join order for the received map queries.
      */
-    public static int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
+    public static final int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
 
     /**
      * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
      */
-    public static int FLAG_IS_LOCAL = 1 << 2;
+    public static final int FLAG_IS_LOCAL = 1 << 2;
+
+    /**
+     * If it is an EXPLAIN command.
+     */
+    public static final int FLAG_EXPLAIN = 1 << 3;
 
     /** */
     private long reqId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 81c28a3..842b947 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -395,10 +395,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
 
         Collection<List<?>> res = qry.getAll();
 
-        if (cacheMode() == REPLICATED)
-            assertEquals(1, res.size());
-        else
-            assertEquals(gridCount(), res.size());
+        assertEquals(1, res.size());
 
         List<?> row = res.iterator().next();
 
@@ -410,10 +407,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
 
         res = qry.getAll();
 
-        if (cacheMode() == REPLICATED)
-            assertEquals(1, res.size());
-        else
-            assertEquals(gridCount(), res.size());
+        assertEquals(1, res.size());
 
         row = res.iterator().next();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 69f66a5..8eae549 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -36,13 +36,12 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
-import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridMergeIndex;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -156,6 +155,70 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSortedMergeIndex() throws Exception {
+        IgniteCache<Integer,Value> c = ignite(0).getOrCreateCache(cacheConfig("v", true,
+            Integer.class, Value.class));
+
+        try {
+            GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 8);
+
+            Random rnd = new GridRandom();
+
+            int cnt = 1000;
+
+            for (int i = 0; i < cnt; i++) {
+                c.put(i, new Value(
+                    rnd.nextInt(5) == 0 ? null: rnd.nextInt(100),
+                    rnd.nextInt(8) == 0 ? null: rnd.nextInt(2000)));
+            }
+
+            List<List<?>> plan = c.query(new SqlFieldsQuery(
+                "explain select snd from Value order by fst desc")).getAll();
+            String rdcPlan = (String)plan.get(1).get(0);
+
+            assertTrue(rdcPlan.contains("merge_sorted"));
+            assertTrue(rdcPlan.contains("/* index sorted */"));
+
+            plan = c.query(new SqlFieldsQuery(
+                "explain select snd from Value")).getAll();
+            rdcPlan = (String)plan.get(1).get(0);
+
+            assertTrue(rdcPlan.contains("merge_scan"));
+            assertFalse(rdcPlan.contains("/* index sorted */"));
+
+            for (int i = 0; i < 10; i++) {
+                X.println(" --> " + i);
+
+                List<List<?>> res = c.query(new SqlFieldsQuery(
+                    "select fst from Value order by fst").setPageSize(5)
+                ).getAll();
+
+                assertEquals(cnt, res.size());
+
+                Integer p = null;
+
+                for (List<?> row : res) {
+                    Integer x = (Integer)row.get(0);
+
+                    if (x != null) {
+                        if (p != null)
+                            assertTrue(x + " >= " + p,  x >= p);
+
+                        p = x;
+                    }
+                }
+            }
+        }
+        finally {
+            GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 1024);
+
+            c.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testGroupIndexOperations() throws Exception {
         IgniteCache<Integer, GroupIndexTestValue> c = ignite(0).getOrCreateCache(cacheConfig("grp", false,
             Integer.class, GroupIndexTestValue.class));
@@ -848,13 +911,44 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
 
             String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key";
 
-            final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+            SqlFieldsQuery qry = new SqlFieldsQuery(select0);
 
             qry.setDistributedJoins(true);
 
             List<List<?>> results = c1.query(qry).getAll();
 
             assertEquals(2, results.size());
+
+            select0 += " order by n2 desc";
+
+            qry = new SqlFieldsQuery(select0);
+
+            qry.setDistributedJoins(true);
+
+            results = c1.query(qry).getAll();
+
+            assertEquals(2, results.size());
+
+            assertEquals("p2", results.get(0).get(1));
+            assertEquals("p1", results.get(1).get(1));
+
+            // Test for replicated subquery with aggregate.
+            select0 = "select p.name " +
+                "from \"pers\".Person2 p, " +
+                "(select max(_key) orgId from \"org\".Organization) o " +
+                "where p.orgId = o.orgId";
+
+            X.println("Plan: \n" +
+                c1.query(new SqlFieldsQuery("explain " + select0).setDistributedJoins(true)).getAll());
+
+            qry = new SqlFieldsQuery(select0);
+
+            qry.setDistributedJoins(true);
+
+            results = c1.query(qry).getAll();
+
+            assertEquals(1, results.size());
+            assertEquals("p2", results.get(0).get(0));
         }
         finally {
             c1.destroy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
index f8526a8..30dd80d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
@@ -249,11 +249,13 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
         X.println(bigQry);
         X.println();
 
-        X.println("Plan: " + pCache.query(new SqlFieldsQuery("EXPLAIN " + bigQry)
+        X.println("   Plan: \n" + pCache.query(new SqlFieldsQuery("EXPLAIN " + bigQry)
             .setDistributedJoins(distributedJoins())).getAll());
 
         List<List<?>> res = compareQueryRes0(pCache, bigQry, distributedJoins(), new Object[0], Ordering.RANDOM);
 
+        X.println("   Result size: " + res.size());
+
         assertTrue(!res.isEmpty()); // Ensure we set good testing data at database.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index b417b0a..0c74f12 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -106,6 +106,7 @@ import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfT
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2TableSelfTest;
 import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
 import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
+import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
 import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
 import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
 import org.apache.ignite.testframework.IgniteTestSuite;
@@ -214,6 +215,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         // Ignite cache and H2 comparison.
         suite.addTestSuite(BaseH2CompareQueryTest.class);
         suite.addTestSuite(H2CompareBigQueryTest.class);
+        suite.addTestSuite(H2CompareBigQueryDistributedJoinsTest.class);
 
         // Cache query metrics.
         suite.addTestSuite(CacheLocalQueryMetricsSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
index 8d42d44..64b7e2a 100644
--- a/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
+++ b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
@@ -26,21 +26,25 @@ from (
 	
 	select  date, orderId, rootOrderId, refOrderId as origOrderId, archSeq, alias
 	from "part".ReplaceOrder where alias='CUSTOM'
-  ) co, "part".OrderParams op
-  where co.date = op.date and co.orderId = op.orderId and co.archSeq = -- TODO: replace with 'dateToLong(co.date)+archSeq'.
-	(
-	  select max(archSeq) -- TODO: replace with 'dateToLong(co.date)+archSeq'.
-	  from (
-		  select  date, orderId, rootOrderId, origOrderId, archSeq, alias 
-		  from "part".CustOrder where alias='CUSTOM'
-		  
-		  union all
-		  
-		  select  date, orderId, rootOrderId, refOrderId as origOrderId, archSeq, alias
-		  from "part".ReplaceOrder where alias='CUSTOM'
-	  ) 
-	  where origOrderId = co.origOrderId and date = co.date
-	) and co.alias='CUSTOM'
+  ) co,
+  "part".OrderParams op,
+    (
+      select origOrderId, date, max(archSeq) maxArchSeq
+      from (
+          select  date, orderId, rootOrderId, origOrderId, archSeq, alias
+          from "part".CustOrder where alias='CUSTOM'
+
+          union all
+
+          select  date, orderId, rootOrderId, refOrderId as origOrderId, archSeq, alias
+          from "part".ReplaceOrder where alias='CUSTOM'
+      )
+      group by origOrderId, date
+   ) h
+  where co.date = op.date and co.orderId = op.orderId
+    and h.origOrderId = co.origOrderId and h.date = co.date
+    and co.archSeq = h.maxArchSeq
+	and co.alias='CUSTOM'
 ) cop 
 inner join (
   select e.date, e.rootOrderId as eRootOrderId, e.rootOrderId, sum(e.execShares) as execShares,