You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/17 18:33:50 UTC
ignite git commit: IGNITE-1232: add more tests (big query
dustributedJoin and IgniteCrossCachesDistributedJoinQueryTest)
Repository: ignite
Updated Branches:
refs/heads/ignite-1232 74484d786 -> 7f99f2a7c
IGNITE-1232: add more tests (big query dustributedJoin and IgniteCrossCachesDistributedJoinQueryTest)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7f99f2a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7f99f2a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7f99f2a7
Branch: refs/heads/ignite-1232
Commit: 7f99f2a7c1975cb639e134b8c53402d66fbc63db
Parents: 74484d7
Author: ashutak <as...@gridgain.com>
Authored: Thu Mar 17 20:33:10 2016 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Thu Mar 17 20:33:10 2016 +0300
----------------------------------------------------------------------
...niteCrossCachesDistributedJoinQueryTest.java | 583 ++++++++++++++-----
.../h2/sql/AbstractH2CompareQueryTest.java | 106 ++--
.../query/h2/sql/H2CompareBigQueryTest.java | 71 ++-
3 files changed, 556 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f99f2a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
index d9b4d05..9461db0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -40,23 +43,22 @@ import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest;
import org.apache.ignite.internal.util.typedef.T4;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
-@SuppressWarnings({"unchecked", "PackageVisibleField"})
-public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstractTest {
+@SuppressWarnings({"unchecked", "PackageVisibleField", "serial"})
+public class IgniteCrossCachesDistributedJoinQueryTest extends AbstractH2CompareQueryTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -72,6 +74,15 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/** */
private String dataAsStr;
+ /** */
+ private String personCacheName;
+
+ /** */
+ private String orgCacheName;
+
+ /** */
+ private String accCacheName;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -84,40 +95,259 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
+ @Override protected CacheConfiguration[] cacheConfigurations() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, CacheMode mode) {
+ throw new UnsupportedOperationException();
+ }
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
startGridsMultiThreaded(NODES - 1);
client = true;
startGrid(NODES - 1);
+
+ conn = openH2Connection(false);
+
+ initializeH2Schema();
}
/** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
+ @Override protected void initCacheAndDbData() throws SQLException {
+ Statement st = conn.createStatement();
+
+ st.execute("create table \"" + accCacheName + "\".Account" +
+ " (" +
+ " _key int not null," +
+ " _val other not null," +
+ " id int unique," +
+ " personId int," +
+ " personDateId TIMESTAMP," +
+ " personStrId varchar(255)" +
+ " )");
+
+ st.execute("create table \"" + personCacheName + "\".Person" +
+ " (" +
+ " _key int not null," +
+ " _val other not null," +
+ " id int unique," +
+ " strId varchar(255) ," +
+ " dateId TIMESTAMP ," +
+ " orgId int," +
+ " orgDateId TIMESTAMP," +
+ " orgStrId varchar(255), " +
+ " name varchar(255), " +
+ " salary int" +
+ " )");
+
+ st.execute("create table \"" + orgCacheName + "\".Organization" +
+ " (" +
+ " _key int not null," +
+ " _val other not null," +
+ " id int unique," +
+ " strId varchar(255) ," +
+ " dateId TIMESTAMP ," +
+ " name varchar(255) " +
+ " )");
+
+ conn.commit();
+
+ st.close();
+
+ for (Account account : data.accounts) {
+ ignite(0).cache(accCacheName).put(account.id, account);
+
+ insertInDb(account);
+ }
+
+ for (Person person : data.persons) {
+ ignite(0).cache(personCacheName).put(person.id, person);
+
+ insertInDb(person);
+ }
- super.afterTestsStopped();
+ for (Organization org : data.orgs) {
+ ignite(0).cache(orgCacheName).put(org.id, org);
+
+ insertInDb(org);
+ }
+ }
+
+ /**
+ * @param acc Account.
+ * @throws SQLException If failed.
+ */
+ private void insertInDb(Account acc) throws SQLException {
+ try (PreparedStatement st = conn.prepareStatement(
+ "insert into \"" + accCacheName + "\".Account (_key, _val, id, personId, personDateId, personStrId) " +
+ "values(?, ?, ?, ?, ?, ?)")) {
+ int i = 0;
+
+ st.setObject(++i, acc.id);
+ st.setObject(++i, acc);
+ st.setObject(++i, acc.id);
+ st.setObject(++i, acc.personId);
+ st.setObject(++i, acc.personDateId);
+ st.setObject(++i, acc.personStrId);
+
+ st.executeUpdate();
+ }
+ }
+
+ /**
+ * @param p Person.
+ * @throws SQLException If failed.
+ */
+ private void insertInDb(Person p) throws SQLException {
+ try (PreparedStatement st = conn.prepareStatement(
+ "insert into \"" + personCacheName + "\".Person (_key, _val, id, strId, dateId, name, orgId, orgDateId, " +
+ "orgStrId, salary) " +
+ "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
+ int i = 0;
+
+ st.setObject(++i, p.id);
+ st.setObject(++i, p);
+ st.setObject(++i, p.id);
+ st.setObject(++i, p.strId);
+ st.setObject(++i, p.dateId);
+ st.setObject(++i, p.name);
+ st.setObject(++i, p.orgId);
+ st.setObject(++i, p.orgDateId);
+ st.setObject(++i, p.orgStrId);
+ st.setObject(++i, p.salary);
+
+ st.executeUpdate();
+ }
+ }
+
+ /**
+ * @param o Organization.
+ * @throws SQLException If failed.
+ */
+ private void insertInDb(Organization o) throws SQLException {
+ try (PreparedStatement st = conn.prepareStatement(
+ "insert into \"" + orgCacheName + "\".Organization (_key, _val, id, strId, dateId, name) " +
+ "values(?, ?, ?, ?, ?, ?)")) {
+ int i = 0;
+
+ st.setObject(++i, o.id);
+ st.setObject(++i, o);
+ st.setObject(++i, o.id);
+ st.setObject(++i, o.strId);
+ st.setObject(++i, o.dateId);
+ st.setObject(++i, o.name);
+
+ st.executeUpdate();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkAllDataEquals() throws Exception {
+ compareQueryRes0(ignite(0).cache(accCacheName), "select _key, _val, id, personId, personDateId, personStrId " +
+ "from \"" + accCacheName + "\".Account");
+
+ compareQueryRes0(ignite(0).cache(personCacheName), "select _key, _val, id, strId, dateId, name, orgId, " +
+ "orgDateId, orgStrId, salary from \"" + personCacheName + "\".Person");
+
+ compareQueryRes0(ignite(0).cache(orgCacheName), "select _key, _val, id, strId, dateId, name " +
+ "from \"" + orgCacheName + "\".Organization");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Statement initializeH2Schema() throws SQLException {
+ Statement st = conn.createStatement();
+
+ st.execute("CREATE SCHEMA \"" + TestCacheType.REPLICATED_1.cacheName + "\"");
+ st.execute("CREATE SCHEMA \"" + TestCacheType.REPLICATED_2.cacheName + "\"");
+ st.execute("CREATE SCHEMA \"" + TestCacheType.REPLICATED_3.cacheName + "\"");
+
+ st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b0_1.cacheName + "\"");
+ st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b0_2.cacheName + "\"");
+ st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b0_3.cacheName + "\"");
+
+ st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b1_1.cacheName + "\"");
+ st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b1_2.cacheName + "\"");
+ st.execute("CREATE SCHEMA \"" + TestCacheType.PARTITIONED_b1_3.cacheName + "\"");
+
+ return st;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 30 * 60 * 1000;
}
/**
* @throws Exception If failed.
*/
- public void testCrossCacheDistributedJoin() throws Exception {
- Map<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> errors = new LinkedHashMap<>();
- List<T4<Integer, TestCacheType, TestCacheType, TestCacheType>> success = new ArrayList<>();
+ public void testCrossCacheDistributedJoin1() throws Exception {
+ final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ add(TestCacheType.REPLICATED_1);
+ }};
- int cfgIdx = 0;
+ final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ add(TestCacheType.REPLICATED_1);
+ add(TestCacheType.PARTITIONED_b0_1);
+ add(TestCacheType.PARTITIONED_b1_1);
+ add(TestCacheType.REPLICATED_2);
+ add(TestCacheType.PARTITIONED_b0_2);
+ add(TestCacheType.PARTITIONED_b1_2);
+ }};
+
+ Set<TestCacheType> orgCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ addAll(accCacheTypes);
+ add(TestCacheType.REPLICATED_3);
+ add(TestCacheType.PARTITIONED_b0_3);
+ add(TestCacheType.PARTITIONED_b1_3);
+ }};
+
+ checkCrossCacheDistributedJoin(accCacheTypes, personCacheTypes, orgCacheTypes);
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheDistributedJoin2() throws Exception {
final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ add(TestCacheType.PARTITIONED_b0_1);
+ }};
+
+ final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
add(TestCacheType.REPLICATED_1);
add(TestCacheType.PARTITIONED_b0_1);
add(TestCacheType.PARTITIONED_b1_1);
+ add(TestCacheType.REPLICATED_2);
+ add(TestCacheType.PARTITIONED_b0_2);
+ add(TestCacheType.PARTITIONED_b1_2);
+ }};
+
+ Set<TestCacheType> orgCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ addAll(accCacheTypes);
+ add(TestCacheType.REPLICATED_3);
+ add(TestCacheType.PARTITIONED_b0_3);
+ add(TestCacheType.PARTITIONED_b1_3);
+ }};
+
+ checkCrossCacheDistributedJoin(accCacheTypes, personCacheTypes, orgCacheTypes);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheDistributedJoin3() throws Exception {
+ final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ add(TestCacheType.PARTITIONED_b0_1);
}};
final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
- addAll(personCacheTypes);
+ add(TestCacheType.REPLICATED_1);
+ add(TestCacheType.PARTITIONED_b0_1);
+ add(TestCacheType.PARTITIONED_b1_1);
add(TestCacheType.REPLICATED_2);
add(TestCacheType.PARTITIONED_b0_2);
add(TestCacheType.PARTITIONED_b1_2);
@@ -130,6 +360,23 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
add(TestCacheType.PARTITIONED_b1_3);
}};
+ checkCrossCacheDistributedJoin(accCacheTypes, personCacheTypes, orgCacheTypes);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param accCacheTypes Account cache types.
+ * @param personCacheTypes Person cache types.
+ * @param orgCacheTypes Organization cache types.
+ */
+ private void checkCrossCacheDistributedJoin(
+ Set<TestCacheType> accCacheTypes, Set<TestCacheType> personCacheTypes,
+ Set<TestCacheType> orgCacheTypes) throws Exception {
+ Map<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> errors = new LinkedHashMap<>();
+ List<T4<Integer, TestCacheType, TestCacheType, TestCacheType>> success = new ArrayList<>();
+
+ int cfgIdx = 0;
+
for (TestCacheType personCacheType : personCacheTypes) {
for (TestCacheType accCacheType : accCacheTypes) {
for (TestCacheType orgCacheType : orgCacheTypes) {
@@ -152,7 +399,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
if (!errors.isEmpty()) {
int total = personCacheTypes.size() * accCacheTypes.size() * orgCacheTypes.size();
- SB sb = new SB("Test failed for the following " + errors.size() + " combination(s) ("+total+" total):\n");
+ SB sb = new SB("Test failed for the following " + errors.size() + " combination(s) (" + total + " total):\n");
for (Map.Entry<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> e : errors.entrySet()) {
T4<Integer, TestCacheType, TestCacheType, TestCacheType> t = e.getKey();
@@ -168,6 +415,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
+ ", orgCache=" + t.get4() + "]").a("\n");
}
+ sb.a("The following data has beed used for test:\n " + dataAsString());
+
fail(sb.toString());
}
}
@@ -211,20 +460,13 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
dataAsStr = null;
data = prepareData();
- final IgniteCache accCache = ignite(0).cache(accCacheType.cacheName);
-
- for (Account account : data.accounts)
- accCache.put(account.id, account);
+ personCacheName = personCacheType.cacheName;
+ accCacheName = accCacheType.cacheName;
+ orgCacheName = orgCacheType.cacheName;
- final IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
+ initCacheAndDbData();
- for (Person person : data.persons)
- personCache.put(person.id, person);
-
- IgniteCache orgCache = ignite(0).cache(orgCacheType.cacheName);
-
- for (Organization org : data.orgs)
- orgCache.put(org.id, org);
+ checkAllDataEquals();
List<String> cacheNames = new ArrayList<>();
@@ -240,72 +482,47 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
log.info("Use cache: " + cache.getName());
- CacheConfiguration cc = (CacheConfiguration)cache.getConfiguration(CacheConfiguration.class);
-
- if (cc.getCacheMode() == REPLICATED && !ignite(i).configuration().isClientMode()) {
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- cache.query(new SqlFieldsQuery("select p.name from " +
- "\"" + personCache.getName() + "\".Person p, " +
- "\"" + accCache.getName() + "\".Account a " +
- "where p._key = a.personId").setDistributedJoins(true));
-
- return null;
- }
- }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- cache.query(new SqlQuery(Person.class,
- "from \"" + personCache.getName() + "\".Person , " +
- "\"" + accCache.getName() + "\".Account " +
- "where Person._key = Account.personId")
- .setDistributedJoins(true));
-
- return null;
- }
- }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
- }
+ if (((IgniteCacheProxy)cache).context().isReplicated() && !ignite(i).configuration().isClientMode())
+ assertProperException(cache);
else {
- if (!cache.getName().equals(orgCacheType.cacheName))
- checkPersonAccountsJoin(cache,
- data.accountsPerPerson,
- accCache.getName(),
- personCache.getName());
-
- if (!cache.getName().equals(accCacheType.cacheName))
- checkOrganizationPersonsJoin(cache,
- data.personsPerOrg,
- orgCacheType.cacheName,
- personCacheType.cacheName);
-
- checkOrganizationPersonAccountJoin(cache,
- data.accountsPerOrg,
- orgCacheType.cacheName,
- personCacheType.cacheName,
- accCacheType.cacheName);
-
- checkUninon(cache,
- data,
- orgCacheType.cacheName,
- personCacheType.cacheName,
- accCacheType.cacheName);
-
- if (!cache.getName().equals(orgCacheType.cacheName))
- checkPersonAccountCrossJoin(cache,
- data,
- personCacheType.cacheName,
- accCacheType.cacheName);
-
- if (!cache.getName().equals(accCacheType.cacheName))
- checkPersonOrganizationGroupBy(cache,
- personCacheType.cacheName,
- orgCacheType.cacheName);
-
- if (!cache.getName().equals(orgCacheType.cacheName))
- checkPersonAccountGroupBy(cache,
- personCacheType.cacheName,
- accCacheType.cacheName);
+ boolean isClientNodeAndCacheIsReplicated = ((IgniteCacheProxy)cache).context().isReplicated()
+ && ignite(i).configuration().isClientMode();
+
+ boolean all3CachesAreReplicated =
+ ((IgniteCacheProxy)ignite(0).cache(accCacheName)).context().isReplicated()
+ && ((IgniteCacheProxy)ignite(0).cache(personCacheName)).context().isReplicated()
+ && ((IgniteCacheProxy)ignite(0).cache(orgCacheName)).context().isReplicated();
+
+ // Queries running on replicated cache should not contain JOINs with partitioned tables.
+ if (!isClientNodeAndCacheIsReplicated || all3CachesAreReplicated) {
+ if (cache.getName().equals(orgCacheType.cacheName))
+ compareQueryRes0(cache, "select name from (select name from \"" + orgCacheName + "\".Organization)", true,
+ new Object[0], Ordering.RANDOM);
+
+ if (!cache.getName().equals(orgCacheType.cacheName))
+ checkPersonAccountsJoin(cache, data.accountsPerPerson);
+
+ if (!cache.getName().equals(accCacheType.cacheName))
+ checkOrganizationPersonsJoin(cache, data.personsPerOrg);
+
+ checkOrganizationPersonAccountJoin(cache, data.accountsPerOrg);
+
+ checkUninon(cache);
+
+ if (!cache.getName().equals(orgCacheType.cacheName))
+ checkPersonAccountCrossJoin(cache);
+
+ if (!cache.getName().equals(accCacheType.cacheName))
+ checkPersonOrganizationGroupBy(cache);
+
+ if (!cache.getName().equals(orgCacheType.cacheName))
+ checkPersonAccountGroupBy(cache);
+
+ if (!cache.getName().equals(accCacheType.cacheName))
+ checkPersonOrganizationJoinInsideSubquery(cache);
+
+ checkJoinInsideSubquery(cache);
+ }
}
}
}
@@ -314,13 +531,52 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
ignite(0).destroyCache(accCacheType.cacheName);
ignite(0).destroyCache(personCacheType.cacheName);
ignite(0).destroyCache(orgCacheType.cacheName);
+
+ Statement st = conn.createStatement();
+
+ st.execute("drop table \"" + accCacheName + "\".Account");
+ st.execute("drop table \"" + personCacheName + "\".Person");
+ st.execute("drop table \"" + orgCacheName + "\".Organization");
+
+ conn.commit();
+
+ st.close();
}
}
/**
- * Organization ids: [0, 9].
- * Person ids: randoms at [10, 9999]
- * Accounts ids: randoms at [10000, 999_999]
+ * @param cache Cache.
+ */
+ private void assertProperException(final IgniteCache cache) {
+ final IgniteCache accCache = ignite(0).cache(accCacheName);
+ final IgniteCache personCache = ignite(0).cache(personCacheName);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlFieldsQuery("select p.name from " +
+ "\"" + personCache.getName() + "\".Person p, " +
+ "\"" + accCache.getName() + "\".Account a " +
+ "where p._key = a.personId").setDistributedJoins(true));
+
+ return null;
+ }
+ }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlQuery(Person.class,
+ "from \"" + personCache.getName() + "\".Person , " +
+ "\"" + accCache.getName() + "\".Account " +
+ "where Person._key = Account.personId")
+ .setDistributedJoins(true));
+
+ return null;
+ }
+ }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+ }
+
+ /**
+ * Organization ids: [0, 9]. Person ids: randoms at [10, 9999]. Accounts ids: randoms at [10000, 999_999]
*
* @return Data.
*/
@@ -371,7 +627,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
while (!accountIds.add(accountId))
accountId = ThreadLocalRandom.current().nextInt(10_000, 1000_000);
- accounts.add(new Account(accountId, personId));
+ accounts.add(new Account(accountId, personId, orgId));
}
accountsPerPerson.put(personId, accountsCnt);
@@ -463,13 +719,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
* @param cnts Organizations per person counts.
- * @param orgCacheName Organization cache name.
- * @param personCacheName Person cache name.
*/
- private void checkOrganizationPersonsJoin(IgniteCache cache,
- Map<Integer, Integer> cnts,
- String orgCacheName,
- String personCacheName) {
+ private void checkOrganizationPersonsJoin(IgniteCache cache, Map<Integer, Integer> cnts) {
SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p.name " +
"from \"" + orgCacheName + "\".Organization o, \"" + personCacheName + "\".Person p " +
"where p.orgId = o._key and o._key=?");
@@ -487,7 +738,6 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
qry2.setDistributedJoins(true);
}
-
long total = 0;
for (int i = 0; i < cnts.size(); i++) {
@@ -523,13 +773,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
* @param cnts Accounts per person counts.
- * @param accCacheName Account cache name.
- * @param personCacheName Person cache name.
*/
- private void checkPersonAccountsJoin(IgniteCache cache,
- Map<Integer, Integer> cnts,
- String accCacheName,
- String personCacheName) {
+ private void checkPersonAccountsJoin(IgniteCache cache, Map<Integer, Integer> cnts) {
List<Query> qrys = new ArrayList<>();
qrys.add(new SqlFieldsQuery("select p.name from " +
@@ -580,7 +825,6 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
);
}
-
long total = 0;
for (Map.Entry<Integer, Integer> e : cnts.entrySet()) {
@@ -637,12 +881,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
* @param cnts Accounts per organization count
- * @param orgCacheName Organization cache name.
- * @param personCacheName Person cache name.
- * @param accCacheName Account cache name.
*/
- private void checkOrganizationPersonAccountJoin(IgniteCache cache, Map<Integer, Integer> cnts, String orgCacheName,
- String personCacheName, String accCacheName) {
+ private void checkOrganizationPersonAccountJoin(IgniteCache cache, Map<Integer, Integer> cnts) {
List<Query> queries = new ArrayList<>();
queries.add(new SqlFieldsQuery("select o.name, p.name, a._key " +
@@ -717,13 +957,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
- * @param data Data.
- * @param orgCacheName Organization cache name.
- * @param personCacheName Person cache name.
- * @param accCacheName Account cache name.
*/
- private void checkUninon(IgniteCache cache, Data data, String orgCacheName,
- String personCacheName, String accCacheName) {
+ private void checkUninon(IgniteCache cache) {
List<Query> queries = new ArrayList<>();
queries.add(new SqlFieldsQuery(
@@ -764,12 +999,8 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
- * @param data Data.
- * @param personCacheName Person cache name.
- * @param accCacheName Account cache name.
*/
- private void checkPersonAccountCrossJoin(IgniteCache cache, Data data,
- String personCacheName, String accCacheName) {
+ private void checkPersonAccountCrossJoin(IgniteCache cache) {
SqlFieldsQuery q = new SqlFieldsQuery("select p.name " +
"from \"" + personCacheName + "\".Person p " +
"cross join \"" + accCacheName + "\".Account a");
@@ -783,14 +1014,11 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
- * @param personCacheName Person cache name.
- * @param orgCacheName Organization cache name.
*/
- private void checkPersonOrganizationGroupBy(IgniteCache cache,
- String personCacheName, String orgCacheName) {
+ private void checkPersonOrganizationGroupBy(IgniteCache cache) {
// Max salary per organization.
SqlFieldsQuery q = new SqlFieldsQuery("select max(p.salary) " +
- "from \"" + personCacheName + "\".Person p join \""+orgCacheName+"\".Organization o " +
+ "from \"" + personCacheName + "\".Person p join \"" + orgCacheName + "\".Organization o " +
"on p.orgId = o.id " +
"group by o.name " +
"having o.id = ?");
@@ -820,14 +1048,11 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
- * @param personCacheName Person cache name.
- * @param accCacheName Account cache name.
*/
- private void checkPersonAccountGroupBy(IgniteCache cache,
- String personCacheName, String accCacheName) {
+ private void checkPersonAccountGroupBy(IgniteCache cache) {
// Count accounts per person.
SqlFieldsQuery q = new SqlFieldsQuery("select count(a.id) " +
- "from \"" + personCacheName + "\".Person p join \""+accCacheName+"\".Account a " +
+ "from \"" + personCacheName + "\".Person p join \"" + accCacheName + "\".Account a " +
"on p.strId = a.personStrId " +
"group by p.name " +
"having p.id = ?");
@@ -848,7 +1073,44 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
if (cnt > 0) {
assertEquals(errMsg, 1, res.size());
assertEquals(errMsg, 1, res.get(0).size());
- assertEquals(errMsg, (long) cnt, res.get(0).get(0));
+ assertEquals(errMsg, (long)cnt, res.get(0).get(0));
+ }
+ else
+ assertEquals(errMsg, 0, res.size());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void checkPersonAccountOrganizationGroupBy(IgniteCache cache) {
+ // Max count of accounts at org.
+ SqlFieldsQuery q = new SqlFieldsQuery("select max(count(a.id)) " +
+ "from " +
+ "\"" + personCacheName + "\".Person p " +
+ "\"" + orgCacheName + "\".Organization o " +
+ "\"" + accCacheName + "\".Account a " +
+ "where p.id = a.personId and p.orgStrId = o.strId " +
+ "group by org.id " +
+ "having o.id = ?");
+
+ q.setDistributedJoins(true);
+
+ for (Map.Entry<Integer, Integer> e : data.accountsPerPerson.entrySet()) {
+ Integer personId = e.getKey();
+ Integer cnt = e.getValue();
+
+ q.setArgs(personId);
+
+ List<List<?>> res = cache.query(q).getAll();
+
+ String errMsg = "Expected data [personId=" + personId + ", cnt=" + cnt + ", data=" + dataAsString() + "]";
+
+ // Cnt == 0 means that there are no accounts for the person.
+ if (cnt > 0) {
+ assertEquals(errMsg, 1, res.size());
+ assertEquals(errMsg, 1, res.get(0).size());
+ assertEquals(errMsg, (long)cnt, res.get(0).get(0));
}
else
assertEquals(errMsg, 0, res.size());
@@ -856,6 +1118,43 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
}
/**
+ * @param cache Cache.
+ */
+ private void checkPersonOrganizationJoinInsideSubquery(IgniteCache cache) throws SQLException {
+ // Select persons with max salary at organization.
+ String sql = "select p.id " +
+ "from \"" + personCacheName + "\".Person p, " +
+ "\"" + orgCacheName + "\".Organization o " +
+ "where p.orgId = o.id and o.id = ? and p.salary = (" +
+ " select max(p.salary) from \"" + personCacheName + "\".Person p join " +
+ " \"" + orgCacheName + "\".Organization o on p.orgDateId = o.dateId group by o.id having o.id = ?)";
+
+ for (Organization org : data.orgs) {
+ Integer orgId = org.id;
+
+ compareQueryRes0(cache, sql, true, new Object[] {orgId, orgId}, Ordering.RANDOM);
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void checkJoinInsideSubquery(IgniteCache cache) throws SQLException {
+ // Select persons with count of accounts of person at organization.
+ String sql = "select p.id, count(a.id) " +
+ "from " +
+ "\"" + personCacheName + "\".Person p, " +
+ "\"" + orgCacheName + "\".Organization o, " +
+ "\"" + accCacheName + "\".Account a " +
+ "where p.id = a.personId and p.orgStrId = o.strId " +
+ "group by p.id " +
+ "having o.id = ?";
+
+ for (Organization org : data.orgs)
+ compareQueryRes0(cache, sql, true, new Object[] {org.id}, Ordering.RANDOM);
+ }
+
+ /**
* @return Data as string.
*/
private String dataAsString() {
@@ -894,8 +1193,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
PARTITIONED_b1_2(CacheMode.PARTITIONED, 1),
/** */
- PARTITIONED_b1_3(CacheMode.PARTITIONED, 1),
- ;
+ PARTITIONED_b1_3(CacheMode.PARTITIONED, 1);
/** */
final String cacheName;
@@ -962,6 +1260,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
this.maxSalaryPerOrg = maxSalaryPerOrg;
}
+ /** {@inheritDoc} */
@Override public String toString() {
return "Data{" +
"orgs=" + orgs +
@@ -995,13 +1294,17 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
@QuerySqlField
private String personStrId;
+ @QuerySqlField
+ private int orgId;
+
/**
* @param id ID.
* @param personId Person ID.
*/
- Account(int id, int personId) {
+ Account(int id, int personId, int orgId) {
this.id = id;
this.personId = personId;
+ this.orgId = orgId;
personDateId = new Date(personId);
personStrId = "personId" + personId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f99f2a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
index b251d63..9d8ca18 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import junit.framework.AssertionFailedError;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -49,7 +50,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Abstract test framework to compare query results from h2 database instance and mixed ignite caches (replicated and partitioned)
- * which have the same data models and data content.
+ * which have the same data models and data content.
*/
public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest {
/** */
@@ -60,7 +61,7 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
/** Replicated cache. */
protected static IgniteCache rCache;
-
+
/** H2 db connection. */
protected static Connection conn;
@@ -77,14 +78,21 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
c.setMarshaller(new OptimizedMarshaller(true));
- c.setCacheConfiguration(createCache("part", CacheMode.PARTITIONED),
- createCache("repl", CacheMode.REPLICATED)
- );
+ c.setCacheConfiguration(cacheConfigurations());
return c;
}
/**
+ * @return Cache configurations.
+ */
+ protected CacheConfiguration[] cacheConfigurations() {
+ return new CacheConfiguration[] {
+ createCache("part", CacheMode.PARTITIONED),
+ createCache("repl", CacheMode.REPLICATED)};
+ }
+
+ /**
* Creates new cache configuration.
*
* @param name Cache name.
@@ -117,11 +125,11 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
Ignite ignite = startGrids(4);
pCache = ignite.cache("part");
-
+
rCache = ignite.cache("repl");
awaitPartitionMapExchange();
-
+
conn = openH2Connection(false);
initializeH2Schema();
@@ -130,17 +138,17 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
checkAllDataEquals();
}
-
+
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
Statement st = conn.createStatement();
-
+
st.execute("DROP ALL OBJECTS");
conn.close();
-
+
stopAllGrids();
}
@@ -175,11 +183,11 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
* @return Pooled connection.
* @throws SQLException In case of error.
*/
- private Connection openH2Connection(boolean autocommit) throws SQLException {
+ protected Connection openH2Connection(boolean autocommit) throws SQLException {
System.setProperty("h2.serializeJavaObject", "false");
-
+
String dbName = "test";
-
+
Connection conn = DriverManager.getConnection("jdbc:h2:mem:" + dbName + ";DB_CLOSE_DELAY=-1");
conn.setAutoCommit(autocommit);
@@ -193,7 +201,7 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
* @param sql SQL query.
* @param args SQL arguments.
* then results will compare as ordered queries.
- * @return Result set after SQL query execution.
+ * @return Result set after SQL query execution.
* @throws SQLException If exception.
*/
protected final List<List<?>> compareQueryRes0(String sql, @Nullable Object... args) throws SQLException {
@@ -201,14 +209,14 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
}
/**
- * Execute given sql query on h2 database and on ignite cache and compare results.
+ * Execute given sql query on h2 database and on ignite cache and compare results.
* Expected that results are not ordered.
*
* @param cache Ignite cache.
* @param sql SQL query.
* @param args SQL arguments.
* then results will compare as ordered queries.
- * @return Result set after SQL query execution.
+ * @return Result set after SQL query execution.
* @throws SQLException If exception.
*/
protected final List<List<?>> compareQueryRes0(IgniteCache cache, String sql, @Nullable Object... args) throws SQLException {
@@ -239,20 +247,46 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
* then results will compare as ordered queries.
* @return Result set after SQL query execution.
* @throws SQLException If exception.
- */
+ */
+ @SuppressWarnings("unchecked")
+ protected static List<List<?>> compareQueryRes0(IgniteCache cache, String sql, @Nullable Object[] args,
+ Ordering ordering) throws SQLException {
+ return compareQueryRes0(cache, sql, false, args, ordering);
+ }
+
+ /**
+ * Execute given sql query on h2 database and on ignite cache and compare results.
+ *
+ * @param cache Ignite cache.
+ * @param sql SQL query.
+ * @param distrib Distributed SQL Join flag.
+ * @param args SQL arguments.
+ * @param ordering Expected ordering of SQL results. If {@link Ordering#ORDERED}
+ * then results will compare as ordered queries.
+ * @return Result set after SQL query execution.
+ * @throws SQLException If exception.
+ */
@SuppressWarnings("unchecked")
- protected final List<List<?>> compareQueryRes0(IgniteCache cache, String sql, @Nullable Object[] args, Ordering ordering) throws SQLException {
+ protected static List<List<?>> compareQueryRes0(IgniteCache cache, String sql, boolean distrib,
+ @Nullable Object[] args, Ordering ordering) throws SQLException {
if (args == null)
args = new Object[] {null};
-
- info("Sql query:\n" + sql + "\nargs=" + Arrays.toString(args));
+
+ X.printerrln("Sql query:\n" + sql + "\nargs=" + Arrays.toString(args));
List<List<?>> h2Res = executeH2Query(sql, args);
- List<List<?>> cacheRes = cache.query(new SqlFieldsQuery(sql).setArgs(args)).getAll();
+ List<List<?>> cacheRes = cache.query(new SqlFieldsQuery(sql).setArgs(args).setDistributedJoins(distrib)).getAll();
+
+ try {
+ assertRsEquals(h2Res, cacheRes, ordering);
+ }
+ catch (AssertionFailedError e) {
+ X.println("[h2Res=" + h2Res + ", cacheRes=" + cacheRes + "]");
+
+ throw e;
+ }
- assertRsEquals(h2Res, cacheRes, ordering);
-
return h2Res;
}
@@ -264,7 +298,7 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
* @return Result of SQL query on h2 database.
* @throws SQLException If exception.
*/
- private List<List<?>> executeH2Query(String sql, Object[] args) throws SQLException {
+ private static List<List<?>> executeH2Query(String sql, Object[] args) throws SQLException {
List<List<?>> res = new ArrayList<>();
ResultSet rs = null;
@@ -285,10 +319,10 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
while (rs.next()) {
List<Object> row = new ArrayList<>(colCnt);
-
+
for (int i = 1; i <= colCnt; i++)
row.add(rs.getObject(i));
-
+
res.add(row);
}
}
@@ -307,9 +341,9 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
* @param ordering Expected ordering of SQL results. If {@link Ordering#ORDERED}
* then results will compare as ordered queries.
*/
- private void assertRsEquals(List<List<?>> rs1, List<List<?>> rs2, Ordering ordering) {
+ private static void assertRsEquals(List<List<?>> rs1, List<List<?>> rs2, Ordering ordering) {
assertEquals("Rows count has to be equal.", rs1.size(), rs2.size());
-
+
switch (ordering){
case ORDERED:
for (int rowNum = 0; rowNum < rs1.size(); rowNum++) {
@@ -349,9 +383,9 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
assertEquals(e1.getKey(), e2.getKey());
assertEquals(e1.getValue(), e2.getValue());
}
-
+
break;
- default:
+ default:
throw new IllegalStateException();
}
}
@@ -360,30 +394,30 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest
* @param rs Result set.
* @return Map of unique rows at the result set to number of occuriances at the result set.
*/
- private TreeMap<String, Integer> extractUniqueRowsWithCounts(Iterable<List<?>> rs) {
+ private static TreeMap<String, Integer> extractUniqueRowsWithCounts(Iterable<List<?>> rs) {
TreeMap<String, Integer> res = new TreeMap<>();
for (List<?> row : rs) {
String rowStr = row.toString();
Integer cnt = res.get(rowStr);
-
+
if (cnt == null)
cnt = 0;
-
+
res.put(rowStr, cnt + 1);
}
return res;
}
-
+
/**
* Ordering type.
*/
protected enum Ordering {
/** Random. */
- RANDOM,
+ RANDOM,
/** Ordered. */
ORDERED
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f99f2a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
index 255ac44..24e4fe3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
@@ -35,11 +35,11 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
/**
- * Executes one big query (and subqueries of the big query) to compare query results from h2 database instance and
+ * Executes one big query (and subqueries of the big query) to compare query results from h2 database instance and
* mixed ignite caches (replicated and partitioned) which have the same data models and data content.
- *
- *
- * <pre>
+ *
+ *
+ * <pre>
*
* -------------------------------------> rootOrderId (virtual) <--------------------------
* | | |
@@ -75,20 +75,20 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
/** Dates count. */
private static final int DATES_CNT = 5;
-
+
/** Full the big query. */
private String bigQry = getBigQry();
/**
* Extracts the big query from file.
- *
+ *
* @return Big query.
*/
private String getBigQry() {
String res = "";
Reader isr = new InputStreamReader(getClass().getResourceAsStream("bigQuery.sql"));
-
+
try(BufferedReader reader = new BufferedReader(isr)) {
for(String line; (line = reader.readLine()) != null; )
if (!line.startsWith("--")) // Skip commented lines.
@@ -96,7 +96,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
}
catch (Throwable e) {
e.printStackTrace();
-
+
fail();
}
@@ -142,8 +142,8 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
// Generate 1 - 5 orders for 1 root order.
for (int i = 0; i < rootOrderId % 5; i++) {
int orderId = idGen.incrementAndGet();
-
- CustOrder order = new CustOrder(orderId, rootOrderId, dates.get(orderId % dates.size()) ,
+
+ CustOrder order = new CustOrder(orderId, rootOrderId, dates.get(orderId % dates.size()) ,
orderId % 2 == 0 ? "CUSTOM" : "OTHER", orderId);
add(order);
@@ -151,13 +151,13 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
pCache.put(order.orderId, order);
insertInDb(order);
- }
+ }
}
}};
final Collection<OrderParams> params = new ArrayList<OrderParams>(){{
for (CustOrder o : orders) {
- OrderParams op = new OrderParams(idGen.incrementAndGet(), o.orderId, o.date,
+ OrderParams op = new OrderParams(idGen.incrementAndGet(), o.orderId, o.date,
o.orderId % 2 == 0 ? "Algo 1" : "Algo 2");
add(op);
@@ -167,7 +167,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
insertInDb(op);
}
}};
-
+
final Collection<ReplaceOrder> replaces = new ArrayList<ReplaceOrder>(){{
for (CustOrder o : orders) {
if (o.orderId % 7 == 0) {
@@ -186,12 +186,12 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
final Collection<Cancel> cancels = new ArrayList<Cancel>(){{
for (CustOrder o : orders) {
if (o.orderId % 9 == 0) {
- Cancel c = new Cancel(idGen.incrementAndGet(), o.orderId,
+ Cancel c = new Cancel(idGen.incrementAndGet(), o.orderId,
new Date(o.date.getTime() + 12 * 60 * 60 * 1000));// Plus a half of day.
add(c);
- pCache.put(c.key(), c);
+ pCache.put(c.key(), c);
insertInDb(c);
}
@@ -203,7 +203,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
int execShares = 10000 + rootOrderId;
int price = 1000 + rootOrderId;
int latsMkt = 3000 + rootOrderId;
-
+
Exec exec = new Exec(rootOrderId, dates.get(rootOrderId % dates.size()), execShares, price, latsMkt);
add(exec);
@@ -232,8 +232,23 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
* @throws Exception If failed.
*/
public void testBigQuery() throws Exception {
- List<List<?>> res = compareQueryRes0(bigQry);
-
+ checkBigQuery(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBigQueryDistributed() throws Exception {
+ checkBigQuery(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param distributedJoins Use distributed joins flag.
+ */
+ public void checkBigQuery(boolean distributedJoins) throws Exception {
+ List<List<?>> res = compareQueryRes0(pCache, bigQry, distributedJoins, new Object[0], Ordering.RANDOM);
+
assertTrue(!res.isEmpty()); // Ensure we set good testing data at database.
}
@@ -295,15 +310,15 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
" price int," +
" lastMkt int" +
" )");
-
+
conn.commit();
-
+
return st;
}
/**
* Insert {@link CustOrder} at h2 database.
- *
+ *
* @param o CustOrder.
*/
private void insertInDb(CustOrder o) throws SQLException {
@@ -311,7 +326,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
"insert into \"part\".CustOrder (_key, _val, orderId, rootOrderId, date, alias, archSeq, origOrderId) " +
"values(?, ?, ?, ?, ?, ?, ?, ?)")) {
int i = 0;
-
+
st.setObject(++i, o.orderId);
st.setObject(++i, o);
st.setObject(++i, o.orderId);
@@ -335,7 +350,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
"insert into \"part\".ReplaceOrder (_key, _val, id, orderId, rootOrderId, date, alias, archSeq, refOrderId) " +
"values(?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
int i = 0;
-
+
st.setObject(++i, o.key());
st.setObject(++i, o);
st.setObject(++i, o.id);
@@ -359,7 +374,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
try(PreparedStatement st = conn.prepareStatement(
"insert into \"part\".OrderParams (_key, _val, id, date, orderId, parentAlgo) values(?, ?, ?, ?, ?, ?)")) {
int i = 0;
-
+
st.setObject(++i, o.key());
st.setObject(++i, o);
st.setObject(++i, o.id);
@@ -380,7 +395,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
try(PreparedStatement st = conn.prepareStatement(
"insert into \"part\".Cancel (_key, _val, id, date, refOrderId) values(?, ?, ?, ?, ?)")) {
int i = 0;
-
+
st.setObject(++i, o.key());
st.setObject(++i, o);
st.setObject(++i, o.id);
@@ -401,7 +416,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
"insert into \"repl\".Exec (_key, _val, date, rootOrderId, execShares, price, lastMkt) " +
"values(?, ?, ?, ?, ?, ?, ?)")) {
int i = 0;
-
+
st.setObject(++i, o.rootOrderId);
st.setObject(++i, o);
st.setObject(++i, o.date);
@@ -475,7 +490,7 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
/** Primary key. */
@QuerySqlField(index = true)
private int id;
-
+
/** Order id. */
@QuerySqlField(index = true)
private int orderId;
@@ -680,4 +695,4 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
return rootOrderId;
}
}
-}
\ No newline at end of file
+}