You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/17 18:33:50 UTC

ignite git commit: IGNITE-1232: add more tests (big query dustributedJoin and IgniteCrossCachesDistributedJoinQueryTest)

Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 74484d786 -> 7f99f2a7c


IGNITE-1232: add more tests (big query dustributedJoin and IgniteCrossCachesDistributedJoinQueryTest)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7f99f2a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7f99f2a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7f99f2a7

Branch: refs/heads/ignite-1232
Commit: 7f99f2a7c1975cb639e134b8c53402d66fbc63db
Parents: 74484d7
Author: ashutak <as...@gridgain.com>
Authored: Thu Mar 17 20:33:10 2016 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Thu Mar 17 20:33:10 2016 +0300

----------------------------------------------------------------------
 ...niteCrossCachesDistributedJoinQueryTest.java | 583 ++++++++++++++-----
 .../h2/sql/AbstractH2CompareQueryTest.java      | 106 ++--
 .../query/h2/sql/H2CompareBigQueryTest.java     |  71 ++-
 3 files changed, 556 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7f99f2a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
index d9b4d05..9461db0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -40,23 +43,22 @@ import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest;
 import org.apache.ignite.internal.util.typedef.T4;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  *
  */
-@SuppressWarnings({"unchecked", "PackageVisibleField"})
-public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstractTest {
+@SuppressWarnings({"unchecked", "PackageVisibleField", "serial"})
+public class IgniteCrossCachesDistributedJoinQueryTest extends AbstractH2CompareQueryTest {
     /** */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
@@ -72,6 +74,15 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     /** */
     private String dataAsStr;
 
+    /** */
+    private String personCacheName;
+
+    /** */
+    private String orgCacheName;
+
+    /** */
+    private String accCacheName;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -84,40 +95,259 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
+    @Override protected CacheConfiguration[] cacheConfigurations() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, CacheMode mode) {
+        throw new UnsupportedOperationException();
+    }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
         startGridsMultiThreaded(NODES - 1);
 
         client = true;
 
         startGrid(NODES - 1);
+
+        conn = openH2Connection(false);
+
+        initializeH2Schema();
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
+    @Override protected void initCacheAndDbData() throws SQLException {
+        Statement st = conn.createStatement();
+
+        st.execute("create table \"" + accCacheName + "\".Account" +
+            "  (" +
+            "  _key int not null," +
+            "  _val other not null," +
+            "  id int unique," +
+            "  personId int," +
+            "  personDateId TIMESTAMP," +
+            "  personStrId varchar(255)" +
+            "  )");
+
+        st.execute("create table \"" + personCacheName + "\".Person" +
+            "  (" +
+            "  _key int not null," +
+            "  _val other not null," +
+            "  id int unique," +
+            "  strId varchar(255) ," +
+            "  dateId TIMESTAMP ," +
+            "  orgId int," +
+            "  orgDateId TIMESTAMP," +
+            "  orgStrId varchar(255), " +
+            "  name varchar(255), " +
+            "  salary int" +
+            "  )");
+
+        st.execute("create table \"" + orgCacheName + "\".Organization" +
+            "  (" +
+            "  _key int not null," +
+            "  _val other not null," +
+            "  id int unique," +
+            "  strId varchar(255) ," +
+            "  dateId TIMESTAMP ," +
+            "  name varchar(255) " +
+            "  )");
+
+        conn.commit();
+
+        st.close();
+
+        for (Account account : data.accounts) {
+            ignite(0).cache(accCacheName).put(account.id, account);
+
+            insertInDb(account);
+        }
+
+        for (Person person : data.persons) {
+            ignite(0).cache(personCacheName).put(person.id, person);
+
+            insertInDb(person);
+        }
 
-        super.afterTestsStopped();
+        for (Organization org : data.orgs) {
+            ignite(0).cache(orgCacheName).put(org.id, org);
+
+            insertInDb(org);
+        }
+    }
+
+    /**
+     * @param acc Account.
+     * @throws SQLException If failed.
+     */
+    private void insertInDb(Account acc) throws SQLException {
+        try (PreparedStatement st = conn.prepareStatement(
+            "insert into \"" + accCacheName + "\".Account (_key, _val, id, personId, personDateId, personStrId) " +
+                "values(?, ?, ?, ?, ?, ?)")) {
+            int i = 0;
+
+            st.setObject(++i, acc.id);
+            st.setObject(++i, acc);
+            st.setObject(++i, acc.id);
+            st.setObject(++i, acc.personId);
+            st.setObject(++i, acc.personDateId);
+            st.setObject(++i, acc.personStrId);
+
+            st.executeUpdate();
+        }
+    }
+
+    /**
+     * @param p Person.
+     * @throws SQLException If failed.
+     */
+    private void insertInDb(Person p) throws SQLException {
+        try (PreparedStatement st = conn.prepareStatement(
+            "insert into \"" + personCacheName + "\".Person (_key, _val, id, strId, dateId, name, orgId, orgDateId, " +
+                "orgStrId, salary) " +
+                "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
+            int i = 0;
+
+            st.setObject(++i, p.id);
+            st.setObject(++i, p);
+            st.setObject(++i, p.id);
+            st.setObject(++i, p.strId);
+            st.setObject(++i, p.dateId);
+            st.setObject(++i, p.name);
+            st.setObject(++i, p.orgId);
+            st.setObject(++i, p.orgDateId);
+            st.setObject(++i, p.orgStrId);
+            st.setObject(++i, p.salary);
+
+            st.executeUpdate();
+        }
+    }
+
+    /**
+     * @param o Organization.
+     * @throws SQLException If failed.
+     */
+    private void insertInDb(Organization o) throws SQLException {
+        try (PreparedStatement st = conn.prepareStatement(
+            "insert into \"" + orgCacheName + "\".Organization (_key, _val, id, strId, dateId, name) " +
+                "values(?, ?, ?, ?, ?, ?)")) {
+            int i = 0;
+
+            st.setObject(++i, o.id);
+            st.setObject(++i, o);
+            st.setObject(++i, o.id);
+            st.setObject(++i, o.strId);
+            st.setObject(++i, o.dateId);
+            st.setObject(++i, o.name);
+
+            st.executeUpdate();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkAllDataEquals() throws Exception {
+        compareQueryRes0(ignite(0).cache(accCacheName), "select _key, _val, id, personId, personDateId, personStrId " +
+            "from \"" + accCacheName + "\".Account");
+
+        compareQueryRes0(ignite(0).cache(personCacheName), "select _key, _val, id, strId, dateId, name, orgId, " +
+            "orgDateId, orgStrId, salary from \"" + personCacheName + "\".Person");
+
+        compareQueryRes0(ignite(0).cache(orgCacheName), "select _key, _val, id, strId, dateId, name " +
+            "from \"" + orgCacheName + "\".Organization");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Statement initializeH2Schema() throws SQLException {
+        Statement st = conn.createStatement();
+
+        st.execute("CREATE SCHEMA \"" + TestCacheType.REPLICATED_1.cacheName + "\"");
+        st.execute("CREATE SCHEMA \"" + TestCacheType.REPLICATED_2.cacheName + "\"");
+        st.execute("CREATE SCHEMA \"" + TestCacheType.REPLICATED_3.cacheName + "\"");
+
+        st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b0_1.cacheName + "\"");
+        st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b0_2.cacheName + "\"");
+        st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b0_3.cacheName + "\"");
+
+        st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b1_1.cacheName + "\"");
+        st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b1_2.cacheName + "\"");
+        st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b1_3.cacheName + "\"");
+
+        return st;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 30 * 60 * 1000;
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testCrossCacheDistributedJoin() throws Exception {
-        Map<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> errors = new LinkedHashMap<>();
-        List<T4<Integer, TestCacheType, TestCacheType, TestCacheType>> success = new ArrayList<>();
+    public void testCrossCacheDistributedJoin1() throws Exception {
+        final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            add(TestCacheType.REPLICATED_1);
+        }};
 
-        int cfgIdx = 0;
+        final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            add(TestCacheType.REPLICATED_1);
+            add(TestCacheType.PARTITIONED_b0_1);
+            add(TestCacheType.PARTITIONED_b1_1);
+            add(TestCacheType.REPLICATED_2);
+            add(TestCacheType.PARTITIONED_b0_2);
+            add(TestCacheType.PARTITIONED_b1_2);
+        }};
+
+        Set<TestCacheType> orgCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            addAll(accCacheTypes);
+            add(TestCacheType.REPLICATED_3);
+            add(TestCacheType.PARTITIONED_b0_3);
+            add(TestCacheType.PARTITIONED_b1_3);
+        }};
+
+        checkCrossCacheDistributedJoin(accCacheTypes, personCacheTypes, orgCacheTypes);
+    }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheDistributedJoin2() throws Exception {
         final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            add(TestCacheType.PARTITIONED_b0_1);
+        }};
+
+        final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
             add(TestCacheType.REPLICATED_1);
             add(TestCacheType.PARTITIONED_b0_1);
             add(TestCacheType.PARTITIONED_b1_1);
+            add(TestCacheType.REPLICATED_2);
+            add(TestCacheType.PARTITIONED_b0_2);
+            add(TestCacheType.PARTITIONED_b1_2);
+        }};
+
+        Set<TestCacheType> orgCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            addAll(accCacheTypes);
+            add(TestCacheType.REPLICATED_3);
+            add(TestCacheType.PARTITIONED_b0_3);
+            add(TestCacheType.PARTITIONED_b1_3);
+        }};
+
+        checkCrossCacheDistributedJoin(accCacheTypes, personCacheTypes, orgCacheTypes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheDistributedJoin3() throws Exception {
+        final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            add(TestCacheType.PARTITIONED_b0_1);
         }};
 
         final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
-            addAll(personCacheTypes);
+            add(TestCacheType.REPLICATED_1);
+            add(TestCacheType.PARTITIONED_b0_1);
+            add(TestCacheType.PARTITIONED_b1_1);
             add(TestCacheType.REPLICATED_2);
             add(TestCacheType.PARTITIONED_b0_2);
             add(TestCacheType.PARTITIONED_b1_2);
@@ -130,6 +360,23 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             add(TestCacheType.PARTITIONED_b1_3);
         }};
 
+        checkCrossCacheDistributedJoin(accCacheTypes, personCacheTypes, orgCacheTypes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     * @param accCacheTypes Account cache types.
+     * @param personCacheTypes Person cache types.
+     * @param orgCacheTypes Organization cache types.
+     */
+    private void checkCrossCacheDistributedJoin(
+        Set<TestCacheType> accCacheTypes, Set<TestCacheType> personCacheTypes,
+        Set<TestCacheType> orgCacheTypes) throws Exception {
+        Map<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> errors = new LinkedHashMap<>();
+        List<T4<Integer, TestCacheType, TestCacheType, TestCacheType>> success = new ArrayList<>();
+
+        int cfgIdx = 0;
+
         for (TestCacheType personCacheType : personCacheTypes) {
             for (TestCacheType accCacheType : accCacheTypes) {
                 for (TestCacheType orgCacheType : orgCacheTypes) {
@@ -152,7 +399,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
         if (!errors.isEmpty()) {
             int total = personCacheTypes.size() * accCacheTypes.size() * orgCacheTypes.size();
 
-            SB sb = new SB("Test failed for the following " + errors.size() + " combination(s) ("+total+" total):\n");
+            SB sb = new SB("Test failed for the following " + errors.size() + " combination(s) (" + total + " total):\n");
 
             for (Map.Entry<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> e : errors.entrySet()) {
                 T4<Integer, TestCacheType, TestCacheType, TestCacheType> t = e.getKey();
@@ -168,6 +415,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
                     + ", orgCache=" + t.get4() + "]").a("\n");
             }
 
+            sb.a("The following data has beed used for test:\n " + dataAsString());
+
             fail(sb.toString());
         }
     }
@@ -211,20 +460,13 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             dataAsStr = null;
             data = prepareData();
 
-            final IgniteCache accCache = ignite(0).cache(accCacheType.cacheName);
-
-            for (Account account : data.accounts)
-                accCache.put(account.id, account);
+            personCacheName = personCacheType.cacheName;
+            accCacheName = accCacheType.cacheName;
+            orgCacheName = orgCacheType.cacheName;
 
-            final IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
+            initCacheAndDbData();
 
-            for (Person person : data.persons)
-                personCache.put(person.id, person);
-
-            IgniteCache orgCache = ignite(0).cache(orgCacheType.cacheName);
-
-            for (Organization org : data.orgs)
-                orgCache.put(org.id, org);
+            checkAllDataEquals();
 
             List<String> cacheNames = new ArrayList<>();
 
@@ -240,72 +482,47 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
 
                     log.info("Use cache: " + cache.getName());
 
-                    CacheConfiguration cc = (CacheConfiguration)cache.getConfiguration(CacheConfiguration.class);
-
-                    if (cc.getCacheMode() == REPLICATED && !ignite(i).configuration().isClientMode()) {
-                        GridTestUtils.assertThrows(log, new Callable<Object>() {
-                            @Override public Object call() throws Exception {
-                                cache.query(new SqlFieldsQuery("select p.name from " +
-                                    "\"" + personCache.getName() + "\".Person p, " +
-                                    "\"" + accCache.getName() + "\".Account a " +
-                                    "where p._key = a.personId").setDistributedJoins(true));
-
-                                return null;
-                            }
-                        }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
-
-                        GridTestUtils.assertThrows(log, new Callable<Object>() {
-                            @Override public Object call() throws Exception {
-                                cache.query(new SqlQuery(Person.class,
-                                    "from \"" + personCache.getName() + "\".Person , " +
-                                        "\"" + accCache.getName() + "\".Account  " +
-                                        "where Person._key = Account.personId")
-                                    .setDistributedJoins(true));
-
-                                return null;
-                            }
-                        }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
-                    }
+                    if (((IgniteCacheProxy)cache).context().isReplicated() && !ignite(i).configuration().isClientMode())
+                        assertProperException(cache);
                     else {
-                        if (!cache.getName().equals(orgCacheType.cacheName))
-                            checkPersonAccountsJoin(cache,
-                                data.accountsPerPerson,
-                                accCache.getName(),
-                                personCache.getName());
-
-                        if (!cache.getName().equals(accCacheType.cacheName))
-                            checkOrganizationPersonsJoin(cache,
-                                data.personsPerOrg,
-                                orgCacheType.cacheName,
-                                personCacheType.cacheName);
-
-                        checkOrganizationPersonAccountJoin(cache,
-                            data.accountsPerOrg,
-                            orgCacheType.cacheName,
-                            personCacheType.cacheName,
-                            accCacheType.cacheName);
-
-                        checkUninon(cache,
-                            data,
-                            orgCacheType.cacheName,
-                            personCacheType.cacheName,
-                            accCacheType.cacheName);
-
-                        if (!cache.getName().equals(orgCacheType.cacheName))
-                            checkPersonAccountCrossJoin(cache,
-                                data,
-                                personCacheType.cacheName,
-                                accCacheType.cacheName);
-
-                        if (!cache.getName().equals(accCacheType.cacheName))
-                            checkPersonOrganizationGroupBy(cache,
-                                personCacheType.cacheName,
-                                orgCacheType.cacheName);
-
-                        if (!cache.getName().equals(orgCacheType.cacheName))
-                            checkPersonAccountGroupBy(cache,
-                                personCacheType.cacheName,
-                                accCacheType.cacheName);
+                        boolean isClientNodeAndCacheIsReplicated = ((IgniteCacheProxy)cache).context().isReplicated()
+                            && ignite(i).configuration().isClientMode();
+
+                        boolean all3CachesAreReplicated =
+                            ((IgniteCacheProxy)ignite(0).cache(accCacheName)).context().isReplicated()
+                                && ((IgniteCacheProxy)ignite(0).cache(personCacheName)).context().isReplicated()
+                                && ((IgniteCacheProxy)ignite(0).cache(orgCacheName)).context().isReplicated();
+
+                        // Queries running on replicated cache should not contain JOINs with partitioned tables.
+                        if (!isClientNodeAndCacheIsReplicated || all3CachesAreReplicated) {
+                            if (cache.getName().equals(orgCacheType.cacheName))
+                                compareQueryRes0(cache, "select name from (select name from \"" + orgCacheName + "\".Organization)", true,
+                                    new Object[0], Ordering.RANDOM);
+
+                            if (!cache.getName().equals(orgCacheType.cacheName))
+                                checkPersonAccountsJoin(cache, data.accountsPerPerson);
+
+                            if (!cache.getName().equals(accCacheType.cacheName))
+                                checkOrganizationPersonsJoin(cache, data.personsPerOrg);
+
+                            checkOrganizationPersonAccountJoin(cache, data.accountsPerOrg);
+
+                            checkUninon(cache);
+
+                            if (!cache.getName().equals(orgCacheType.cacheName))
+                                checkPersonAccountCrossJoin(cache);
+
+                            if (!cache.getName().equals(accCacheType.cacheName))
+                                checkPersonOrganizationGroupBy(cache);
+
+                            if (!cache.getName().equals(orgCacheType.cacheName))
+                                checkPersonAccountGroupBy(cache);
+
+                            if (!cache.getName().equals(accCacheType.cacheName))
+                                checkPersonOrganizationJoinInsideSubquery(cache);
+
+                            checkJoinInsideSubquery(cache);
+                        }
                     }
                 }
             }
@@ -314,13 +531,52 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             ignite(0).destroyCache(accCacheType.cacheName);
             ignite(0).destroyCache(personCacheType.cacheName);
             ignite(0).destroyCache(orgCacheType.cacheName);
+
+            Statement st = conn.createStatement();
+
+            st.execute("drop table \"" + accCacheName + "\".Account");
+            st.execute("drop table \"" + personCacheName + "\".Person");
+            st.execute("drop table \"" + orgCacheName + "\".Organization");
+
+            conn.commit();
+
+            st.close();
         }
     }
 
     /**
-     * Organization ids: [0, 9].
-     * Person ids: randoms at [10, 9999]
-     * Accounts ids: randoms at [10000, 999_999]
+     * @param cache Cache.
+     */
+    private void assertProperException(final IgniteCache cache) {
+        final IgniteCache accCache = ignite(0).cache(accCacheName);
+        final IgniteCache personCache = ignite(0).cache(personCacheName);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                cache.query(new SqlFieldsQuery("select p.name from " +
+                    "\"" + personCache.getName() + "\".Person p, " +
+                    "\"" + accCache.getName() + "\".Account a " +
+                    "where p._key = a.personId").setDistributedJoins(true));
+
+                return null;
+            }
+        }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                cache.query(new SqlQuery(Person.class,
+                    "from \"" + personCache.getName() + "\".Person , " +
+                        "\"" + accCache.getName() + "\".Account  " +
+                        "where Person._key = Account.personId")
+                    .setDistributedJoins(true));
+
+                return null;
+            }
+        }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+    }
+
+    /**
+     * Organization ids: [0, 9]. Person ids: randoms at [10, 9999]. Accounts ids: randoms at [10000, 999_999]
      *
      * @return Data.
      */
@@ -371,7 +627,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
                     while (!accountIds.add(accountId))
                         accountId = ThreadLocalRandom.current().nextInt(10_000, 1000_000);
 
-                    accounts.add(new Account(accountId, personId));
+                    accounts.add(new Account(accountId, personId, orgId));
                 }
 
                 accountsPerPerson.put(personId, accountsCnt);
@@ -463,13 +719,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     /**
      * @param cache Cache.
      * @param cnts Organizations per person counts.
-     * @param orgCacheName Organization cache name.
-     * @param personCacheName Person cache name.
      */
-    private void checkOrganizationPersonsJoin(IgniteCache cache,
-        Map<Integer, Integer> cnts,
-        String orgCacheName,
-        String personCacheName) {
+    private void checkOrganizationPersonsJoin(IgniteCache cache, Map<Integer, Integer> cnts) {
         SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p.name " +
             "from \"" + orgCacheName + "\".Organization o, \"" + personCacheName + "\".Person p " +
             "where p.orgId = o._key and o._key=?");
@@ -487,7 +738,6 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             qry2.setDistributedJoins(true);
         }
 
-
         long total = 0;
 
         for (int i = 0; i < cnts.size(); i++) {
@@ -523,13 +773,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     /**
      * @param cache Cache.
      * @param cnts Accounts per person counts.
-     * @param accCacheName Account cache name.
-     * @param personCacheName Person cache name.
      */
-    private void checkPersonAccountsJoin(IgniteCache cache,
-        Map<Integer, Integer> cnts,
-        String accCacheName,
-        String personCacheName) {
+    private void checkPersonAccountsJoin(IgniteCache cache, Map<Integer, Integer> cnts) {
         List<Query> qrys = new ArrayList<>();
 
         qrys.add(new SqlFieldsQuery("select p.name from " +
@@ -580,7 +825,6 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             );
         }
 
-
         long total = 0;
 
         for (Map.Entry<Integer, Integer> e : cnts.entrySet()) {
@@ -637,12 +881,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     /**
      * @param cache Cache.
      * @param cnts Accounts per organization count
-     * @param orgCacheName Organization cache name.
-     * @param personCacheName Person cache name.
-     * @param accCacheName Account cache name.
      */
-    private void checkOrganizationPersonAccountJoin(IgniteCache cache, Map<Integer, Integer> cnts, String orgCacheName,
-        String personCacheName, String accCacheName) {
+    private void checkOrganizationPersonAccountJoin(IgniteCache cache, Map<Integer, Integer> cnts) {
         List<Query> queries = new ArrayList<>();
 
         queries.add(new SqlFieldsQuery("select o.name, p.name, a._key " +
@@ -717,13 +957,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
 
     /**
      * @param cache Cache.
-     * @param data Data.
-     * @param orgCacheName Organization cache name.
-     * @param personCacheName Person cache name.
-     * @param accCacheName Account cache name.
      */
-    private void checkUninon(IgniteCache cache, Data data, String orgCacheName,
-        String personCacheName, String accCacheName) {
+    private void checkUninon(IgniteCache cache) {
         List<Query> queries = new ArrayList<>();
 
         queries.add(new SqlFieldsQuery(
@@ -764,12 +999,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
 
     /**
      * @param cache Cache.
-     * @param data Data.
-     * @param personCacheName Person cache name.
-     * @param accCacheName Account cache name.
      */
-    private void checkPersonAccountCrossJoin(IgniteCache cache, Data data,
-        String personCacheName, String accCacheName) {
+    private void checkPersonAccountCrossJoin(IgniteCache cache) {
         SqlFieldsQuery q = new SqlFieldsQuery("select p.name " +
             "from \"" + personCacheName + "\".Person p " +
             "cross join \"" + accCacheName + "\".Account a");
@@ -783,14 +1014,11 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
 
     /**
      * @param cache Cache.
-     * @param personCacheName Person cache name.
-     * @param orgCacheName Organization cache name.
      */
-    private void checkPersonOrganizationGroupBy(IgniteCache cache,
-        String personCacheName, String orgCacheName) {
+    private void checkPersonOrganizationGroupBy(IgniteCache cache) {
         // Max salary per organization.
         SqlFieldsQuery q = new SqlFieldsQuery("select max(p.salary) " +
-            "from \"" + personCacheName + "\".Person p join \""+orgCacheName+"\".Organization o " +
+            "from \"" + personCacheName + "\".Person p join \"" + orgCacheName + "\".Organization o " +
             "on p.orgId = o.id " +
             "group by o.name " +
             "having o.id = ?");
@@ -820,14 +1048,11 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
 
     /**
      * @param cache Cache.
-     * @param personCacheName Person cache name.
-     * @param accCacheName Account cache name.
      */
-    private void checkPersonAccountGroupBy(IgniteCache cache,
-        String personCacheName, String accCacheName) {
+    private void checkPersonAccountGroupBy(IgniteCache cache) {
         // Count accounts per person.
         SqlFieldsQuery q = new SqlFieldsQuery("select count(a.id) " +
-            "from \"" + personCacheName + "\".Person p join \""+accCacheName+"\".Account a " +
+            "from \"" + personCacheName + "\".Person p join \"" + accCacheName + "\".Account a " +
             "on p.strId = a.personStrId " +
             "group by p.name " +
             "having p.id = ?");
@@ -848,7 +1073,44 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             if (cnt > 0) {
                 assertEquals(errMsg, 1, res.size());
                 assertEquals(errMsg, 1, res.get(0).size());
-                assertEquals(errMsg, (long) cnt, res.get(0).get(0));
+                assertEquals(errMsg, (long)cnt, res.get(0).get(0));
+            }
+            else
+                assertEquals(errMsg, 0, res.size());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void checkPersonAccountOrganizationGroupBy(IgniteCache cache) {
+        // Max count of accounts at org.
+        SqlFieldsQuery q = new SqlFieldsQuery("select max(count(a.id)) " +
+            "from " +
+            "\"" + personCacheName + "\".Person p " +
+            "\"" + orgCacheName + "\".Organization o " +
+            "\"" + accCacheName + "\".Account a " +
+            "where p.id = a.personId and p.orgStrId = o.strId " +
+            "group by org.id " +
+            "having o.id = ?");
+
+        q.setDistributedJoins(true);
+
+        for (Map.Entry<Integer, Integer> e : data.accountsPerPerson.entrySet()) {
+            Integer personId = e.getKey();
+            Integer cnt = e.getValue();
+
+            q.setArgs(personId);
+
+            List<List<?>> res = cache.query(q).getAll();
+
+            String errMsg = "Expected data [personId=" + personId + ", cnt=" + cnt + ", data=" + dataAsString() + "]";
+
+            // Cnt == 0 means that there are no accounts for the person.
+            if (cnt > 0) {
+                assertEquals(errMsg, 1, res.size());
+                assertEquals(errMsg, 1, res.get(0).size());
+                assertEquals(errMsg, (long)cnt, res.get(0).get(0));
             }
             else
                 assertEquals(errMsg, 0, res.size());
@@ -856,6 +1118,43 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     }
 
     /**
+     * @param cache Cache.
+     */
+    private void checkPersonOrganizationJoinInsideSubquery(IgniteCache cache) throws SQLException {
+        // Select persons with max salary at organization.
+        String sql = "select p.id " +
+            "from \"" + personCacheName + "\".Person p, " +
+            "\"" + orgCacheName + "\".Organization o " +
+            "where p.orgId = o.id and o.id = ? and p.salary = (" +
+            "   select max(p.salary) from \"" + personCacheName + "\".Person p join " +
+            "   \"" + orgCacheName + "\".Organization o on p.orgDateId = o.dateId group by o.id having o.id = ?)";
+
+        for (Organization org : data.orgs) {
+            Integer orgId = org.id;
+
+            compareQueryRes0(cache, sql, true, new Object[] {orgId, orgId}, Ordering.RANDOM);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void checkJoinInsideSubquery(IgniteCache cache) throws SQLException {
+        // Select persons with count of accounts of person at organization.
+        String sql = "select p.id, count(a.id) " +
+            "from " +
+            "\"" + personCacheName + "\".Person p, " +
+            "\"" + orgCacheName + "\".Organization o, " +
+            "\"" + accCacheName + "\".Account a " +
+            "where p.id = a.personId and p.orgStrId = o.strId " +
+            "group by p.id " +
+            "having o.id = ?";
+
+        for (Organization org : data.orgs)
+            compareQueryRes0(cache, sql, true, new Object[] {org.id}, Ordering.RANDOM);
+    }
+
+    /**
      * @return Data as string.
      */
     private String dataAsString() {
@@ -894,8 +1193,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
         PARTITIONED_b1_2(CacheMode.PARTITIONED, 1),
 
         /** */
-        PARTITIONED_b1_3(CacheMode.PARTITIONED, 1),
-        ;
+        PARTITIONED_b1_3(CacheMode.PARTITIONED, 1);
 
         /** */
         final String cacheName;
@@ -962,6 +1260,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             this.maxSalaryPerOrg = maxSalaryPerOrg;
         }
 
+        /** {@inheritDoc} */
         @Override public String toString() {
             return "Data{" +
                 "orgs=" + orgs +
@@ -995,13 +1294,17 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
         @QuerySqlField
         private String personStrId;
 
+        @QuerySqlField
+        private int orgId;
+
         /**
          * @param id ID.
          * @param personId Person ID.
          */
-        Account(int id, int personId) {
+        Account(int id, int personId, int orgId) {
             this.id = id;
             this.personId = personId;
+            this.orgId = orgId;
             personDateId = new Date(personId);
             personStrId = "personId" + personId;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f99f2a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
index b251d63..9d8ca18 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import junit.framework.AssertionFailedError;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -49,7 +50,7 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract test framework to compare query results from h2 database instance and mixed ignite caches (replicated and partitioned)
- * which have the same data models and data content. 
+ * which have the same data models and data content.
  */
 public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest {
     /** */
@@ -60,7 +61,7 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
 
     /** Replicated cache. */
     protected static IgniteCache rCache;
-    
+
     /** H2 db connection. */
     protected static Connection conn;
 
@@ -77,14 +78,21 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
 
         c.setMarshaller(new OptimizedMarshaller(true));
 
-        c.setCacheConfiguration(createCache("part", CacheMode.PARTITIONED),
-            createCache("repl", CacheMode.REPLICATED)
-        );
+        c.setCacheConfiguration(cacheConfigurations());
 
         return c;
     }
 
     /**
+     * @return Cache configurations.
+     */
+    protected CacheConfiguration[] cacheConfigurations() {
+        return new CacheConfiguration[] {
+            createCache("part", CacheMode.PARTITIONED),
+            createCache("repl", CacheMode.REPLICATED)};
+    }
+
+    /**
      * Creates new cache configuration.
      *
      * @param name Cache name.
@@ -117,11 +125,11 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
         Ignite ignite = startGrids(4);
 
         pCache = ignite.cache("part");
-        
+
         rCache = ignite.cache("repl");
 
         awaitPartitionMapExchange();
-        
+
         conn = openH2Connection(false);
 
         initializeH2Schema();
@@ -130,17 +138,17 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
 
         checkAllDataEquals();
     }
-    
+
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
 
         Statement st = conn.createStatement();
-        
+
         st.execute("DROP ALL OBJECTS");
 
         conn.close();
-        
+
         stopAllGrids();
     }
 
@@ -175,11 +183,11 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
      * @return Pooled connection.
      * @throws SQLException In case of error.
      */
-    private Connection openH2Connection(boolean autocommit) throws SQLException {
+    protected Connection openH2Connection(boolean autocommit) throws SQLException {
         System.setProperty("h2.serializeJavaObject", "false");
-        
+
         String dbName = "test";
-        
+
         Connection conn = DriverManager.getConnection("jdbc:h2:mem:" + dbName + ";DB_CLOSE_DELAY=-1");
 
         conn.setAutoCommit(autocommit);
@@ -193,7 +201,7 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
      * @param sql SQL query.
      * @param args SQL arguments.
      * then results will compare as ordered queries.
-     * @return Result set after SQL query execution. 
+     * @return Result set after SQL query execution.
      * @throws SQLException If exception.
      */
     protected final List<List<?>> compareQueryRes0(String sql, @Nullable Object... args) throws SQLException {
@@ -201,14 +209,14 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
     }
 
     /**
-     * Execute given sql query on h2 database and on ignite cache and compare results. 
+     * Execute given sql query on h2 database and on ignite cache and compare results.
      * Expected that results are not ordered.
      *
      * @param cache Ignite cache.
      * @param sql SQL query.
      * @param args SQL arguments.
      * then results will compare as ordered queries.
-     * @return Result set after SQL query execution. 
+     * @return Result set after SQL query execution.
      * @throws SQLException If exception.
      */
     protected final List<List<?>> compareQueryRes0(IgniteCache cache, String sql, @Nullable Object... args) throws SQLException {
@@ -239,20 +247,46 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
      * then results will compare as ordered queries.
      * @return Result set after SQL query execution.
      * @throws SQLException If exception.
-     */    
+     */
+    @SuppressWarnings("unchecked")
+    protected static List<List<?>> compareQueryRes0(IgniteCache cache, String sql, @Nullable Object[] args,
+        Ordering ordering) throws SQLException {
+        return compareQueryRes0(cache, sql, false, args, ordering);
+    }
+
+    /**
+     * Execute given sql query on h2 database and on ignite cache and compare results.
+     *
+     * @param cache Ignite cache.
+     * @param sql SQL query.
+     * @param distrib Distributed SQL Join flag.
+     * @param args SQL arguments.
+     * @param ordering Expected ordering of SQL results. If {@link Ordering#ORDERED}
+     * then results will compare as ordered queries.
+     * @return Result set after SQL query execution.
+     * @throws SQLException If exception.
+     */
     @SuppressWarnings("unchecked")
-    protected final List<List<?>> compareQueryRes0(IgniteCache cache, String sql, @Nullable Object[] args, Ordering ordering) throws SQLException {
+    protected static List<List<?>> compareQueryRes0(IgniteCache cache, String sql, boolean distrib,
+        @Nullable Object[] args, Ordering ordering) throws SQLException {
         if (args == null)
             args = new Object[] {null};
-        
-        info("Sql query:\n" + sql + "\nargs=" + Arrays.toString(args));
+
+        X.printerrln("Sql query:\n" + sql + "\nargs=" + Arrays.toString(args));
 
         List<List<?>> h2Res = executeH2Query(sql, args);
 
-        List<List<?>> cacheRes = cache.query(new SqlFieldsQuery(sql).setArgs(args)).getAll();
+        List<List<?>> cacheRes = cache.query(new SqlFieldsQuery(sql).setArgs(args).setDistributedJoins(distrib)).getAll();
+
+        try {
+            assertRsEquals(h2Res, cacheRes, ordering);
+        }
+        catch (AssertionFailedError e) {
+            X.println("[h2Res=" + h2Res + ", cacheRes=" + cacheRes + "]");
+
+            throw e;
+        }
 
-        assertRsEquals(h2Res, cacheRes, ordering);
-        
         return h2Res;
     }
 
@@ -264,7 +298,7 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
      * @return Result of SQL query on h2 database.
      * @throws SQLException If exception.
      */
-    private List<List<?>> executeH2Query(String sql, Object[] args) throws SQLException {
+    private static List<List<?>> executeH2Query(String sql, Object[] args) throws SQLException {
         List<List<?>> res = new ArrayList<>();
         ResultSet rs = null;
 
@@ -285,10 +319,10 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
 
             while (rs.next()) {
                 List<Object> row = new ArrayList<>(colCnt);
-                
+
                 for (int i = 1; i <= colCnt; i++)
                     row.add(rs.getObject(i));
-                
+
                 res.add(row);
             }
         }
@@ -307,9 +341,9 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
      * @param ordering Expected ordering of SQL results. If {@link Ordering#ORDERED}
      * then results will compare as ordered queries.
      */
-    private void assertRsEquals(List<List<?>> rs1, List<List<?>> rs2, Ordering ordering) {
+    private static void assertRsEquals(List<List<?>> rs1, List<List<?>> rs2, Ordering ordering) {
         assertEquals("Rows count has to be equal.", rs1.size(), rs2.size());
-        
+
         switch (ordering){
             case ORDERED:
                 for (int rowNum = 0; rowNum < rs1.size(); rowNum++) {
@@ -349,9 +383,9 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
                     assertEquals(e1.getKey(), e2.getKey());
                     assertEquals(e1.getValue(), e2.getValue());
                 }
-                
+
                 break;
-            default: 
+            default:
                 throw new IllegalStateException();
         }
     }
@@ -360,30 +394,30 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
      * @param rs Result set.
      * @return Map of unique rows at the result set to number of occuriances at the result set.
      */
-    private TreeMap<String, Integer> extractUniqueRowsWithCounts(Iterable<List<?>> rs) {
+    private static TreeMap<String, Integer> extractUniqueRowsWithCounts(Iterable<List<?>> rs) {
         TreeMap<String, Integer> res = new TreeMap<>();
 
         for (List<?> row : rs) {
             String rowStr = row.toString();
 
             Integer cnt = res.get(rowStr);
-            
+
             if (cnt == null)
                 cnt = 0;
-            
+
             res.put(rowStr, cnt + 1);
         }
 
         return res;
     }
-    
+
     /**
      * Ordering type.
      */
     protected enum Ordering {
         /** Random. */
-        RANDOM, 
+        RANDOM,
         /** Ordered. */
         ORDERED
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f99f2a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
index 255ac44..24e4fe3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
@@ -35,11 +35,11 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 
 /**
- * Executes one big query (and subqueries of the big query) to compare query results from h2 database instance and 
+ * Executes one big query (and subqueries of the big query) to compare query results from h2 database instance and
  * mixed ignite caches (replicated and partitioned) which have the same data models and data content.
- * 
- * 
- * <pre> 
+ *
+ *
+ * <pre>
  *
  *  -------------------------------------> rootOrderId (virtual) <--------------------------
  *  |                                                                |                     |
@@ -75,20 +75,20 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
 
     /** Dates count. */
     private static final int DATES_CNT = 5;
-    
+
     /** Full the big query. */
     private String bigQry = getBigQry();
 
     /**
      * Extracts the big query from file.
-     *  
+     *
      * @return Big query.
      */
     private String getBigQry() {
         String res = "";
 
         Reader isr = new InputStreamReader(getClass().getResourceAsStream("bigQuery.sql"));
-        
+
         try(BufferedReader reader = new BufferedReader(isr)) {
             for(String line; (line = reader.readLine()) != null; )
                 if (!line.startsWith("--")) // Skip commented lines.
@@ -96,7 +96,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
         }
         catch (Throwable e) {
             e.printStackTrace();
-            
+
             fail();
         }
 
@@ -142,8 +142,8 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
                 // Generate 1 - 5 orders for 1 root order.
                 for (int i = 0; i < rootOrderId % 5; i++) {
                     int orderId = idGen.incrementAndGet();
-                    
-                    CustOrder order = new CustOrder(orderId, rootOrderId, dates.get(orderId % dates.size()) , 
+
+                    CustOrder order = new CustOrder(orderId, rootOrderId, dates.get(orderId % dates.size()) ,
                         orderId % 2 == 0 ? "CUSTOM" : "OTHER", orderId);
 
                     add(order);
@@ -151,13 +151,13 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
                     pCache.put(order.orderId, order);
 
                     insertInDb(order);
-                }                
+                }
             }
         }};
 
         final Collection<OrderParams> params = new ArrayList<OrderParams>(){{
             for (CustOrder o : orders) {
-                OrderParams op = new OrderParams(idGen.incrementAndGet(), o.orderId, o.date, 
+                OrderParams op = new OrderParams(idGen.incrementAndGet(), o.orderId, o.date,
                     o.orderId % 2 == 0 ? "Algo 1" : "Algo 2");
 
                 add(op);
@@ -167,7 +167,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
                 insertInDb(op);
             }
         }};
-        
+
         final Collection<ReplaceOrder> replaces = new ArrayList<ReplaceOrder>(){{
             for (CustOrder o : orders) {
                 if (o.orderId % 7 == 0) {
@@ -186,12 +186,12 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
         final Collection<Cancel> cancels = new ArrayList<Cancel>(){{
             for (CustOrder o : orders) {
                 if (o.orderId % 9 == 0) {
-                    Cancel c = new Cancel(idGen.incrementAndGet(), o.orderId, 
+                    Cancel c = new Cancel(idGen.incrementAndGet(), o.orderId,
                         new Date(o.date.getTime() + 12 * 60 * 60 * 1000));// Plus a half of day.
 
                     add(c);
 
-                    pCache.put(c.key(), c); 
+                    pCache.put(c.key(), c);
 
                     insertInDb(c);
                 }
@@ -203,7 +203,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
                 int execShares = 10000 + rootOrderId;
                 int price = 1000 + rootOrderId;
                 int latsMkt = 3000 + rootOrderId;
-                
+
                 Exec exec = new Exec(rootOrderId, dates.get(rootOrderId % dates.size()), execShares, price, latsMkt);
 
                 add(exec);
@@ -232,8 +232,23 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
      * @throws Exception If failed.
      */
     public void testBigQuery() throws Exception {
-        List<List<?>> res = compareQueryRes0(bigQry);
-        
+        checkBigQuery(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBigQueryDistributed() throws Exception {
+        checkBigQuery(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     * @param distributedJoins Use distributed joins flag.
+     */
+    public void checkBigQuery(boolean distributedJoins) throws Exception {
+        List<List<?>> res = compareQueryRes0(pCache, bigQry, distributedJoins, new Object[0], Ordering.RANDOM);
+
         assertTrue(!res.isEmpty()); // Ensure we set good testing data at database.
     }
 
@@ -295,15 +310,15 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
             "  price int," +
             "  lastMkt int" +
             "  )");
-        
+
         conn.commit();
-        
+
         return st;
     }
 
     /**
      * Insert {@link CustOrder} at h2 database.
-     *  
+     *
      * @param o CustOrder.
      */
     private void insertInDb(CustOrder o) throws SQLException {
@@ -311,7 +326,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
             "insert into \"part\".CustOrder (_key, _val, orderId, rootOrderId, date, alias, archSeq, origOrderId) " +
                 "values(?, ?, ?, ?, ?, ?, ?, ?)")) {
             int i = 0;
-            
+
             st.setObject(++i, o.orderId);
             st.setObject(++i, o);
             st.setObject(++i, o.orderId);
@@ -335,7 +350,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
             "insert into \"part\".ReplaceOrder (_key, _val, id, orderId, rootOrderId, date, alias, archSeq, refOrderId) " +
                 "values(?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
             int i = 0;
-            
+
             st.setObject(++i, o.key());
             st.setObject(++i, o);
             st.setObject(++i, o.id);
@@ -359,7 +374,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
         try(PreparedStatement st = conn.prepareStatement(
             "insert into \"part\".OrderParams (_key, _val, id, date, orderId, parentAlgo) values(?, ?, ?, ?, ?, ?)")) {
             int i = 0;
-            
+
             st.setObject(++i, o.key());
             st.setObject(++i, o);
             st.setObject(++i, o.id);
@@ -380,7 +395,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
         try(PreparedStatement st = conn.prepareStatement(
             "insert into \"part\".Cancel (_key, _val, id, date, refOrderId) values(?, ?, ?, ?, ?)")) {
             int i = 0;
-            
+
             st.setObject(++i, o.key());
             st.setObject(++i, o);
             st.setObject(++i, o.id);
@@ -401,7 +416,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
             "insert into \"repl\".Exec (_key, _val, date, rootOrderId, execShares, price, lastMkt) " +
                 "values(?, ?, ?, ?, ?, ?, ?)")) {
             int i = 0;
-            
+
             st.setObject(++i, o.rootOrderId);
             st.setObject(++i, o);
             st.setObject(++i, o.date);
@@ -475,7 +490,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
         /** Primary key. */
         @QuerySqlField(index = true)
         private int id;
-        
+
         /** Order id. */
         @QuerySqlField(index = true)
         private int orderId;
@@ -680,4 +695,4 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
             return rootOrderId;
         }
     }
-}
\ No newline at end of file
+}