You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/01 19:46:09 UTC
ignite git commit: ignite-1231: added
IgniteCrossCachesDistributedJoinQueryTest
Repository: ignite
Updated Branches:
refs/heads/ignite-1232 95ef93f60 -> 71286d7c3
ignite-1231: added IgniteCrossCachesDistributedJoinQueryTest
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/71286d7c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/71286d7c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/71286d7c
Branch: refs/heads/ignite-1232
Commit: 71286d7c37ab23169614e061d390e8d2923f9ff7
Parents: 95ef93f
Author: ashutak <as...@gridgain.com>
Authored: Tue Mar 1 21:44:39 2016 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Tue Mar 1 21:44:39 2016 +0300
----------------------------------------------------------------------
...niteCrossCachesDistributedJoinQueryTest.java | 579 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
2 files changed, 581 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/71286d7c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
new file mode 100644
index 0000000..648a4c7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T4;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheDistributedJoin() throws Exception {
+ Map<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> errors = new LinkedHashMap<>();
+ List<T4<Integer, TestCacheType, TestCacheType, TestCacheType>> success = new ArrayList<>();
+
+ int cfgIdx = 0;
+
+ final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ add(TestCacheType.REPLICATED_1);
+ add(TestCacheType.PARTITIONED_b0_1);
+ add(TestCacheType.PARTITIONED_b1_1);
+ }};
+
+ final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ addAll(personCacheTypes);
+ add(TestCacheType.REPLICATED_2);
+ add(TestCacheType.PARTITIONED_b0_2);
+ add(TestCacheType.PARTITIONED_b1_2);
+ }};
+
+ Set<TestCacheType> orgCacheTypes = new LinkedHashSet<TestCacheType>() {{
+ addAll(accCacheTypes);
+ add(TestCacheType.REPLICATED_3);
+ add(TestCacheType.PARTITIONED_b0_3);
+ add(TestCacheType.PARTITIONED_b1_3);
+ }};
+
+ for (TestCacheType personCacheType : personCacheTypes) {
+ for (TestCacheType accCacheType : accCacheTypes) {
+ for (TestCacheType orgCacheType : orgCacheTypes) {
+ try {
+ checkDistributedCrossCacheJoin(personCacheType, accCacheType, orgCacheType);
+
+ success.add(new T4<>(cfgIdx, personCacheType, accCacheType, orgCacheType));
+ }
+ catch (Throwable e) {
+ error("Failed to make distributed cross cache select.", e);
+
+ errors.put(new T4<>(cfgIdx, personCacheType, accCacheType, orgCacheType), e);
+ }
+
+ cfgIdx++;
+ }
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ int total = personCacheTypes.size() * accCacheTypes.size() * orgCacheTypes.size();
+
+ SB sb = new SB("Test failed for the following " + errors.size() + " combination(s) ("+total+" total):\n");
+
+ for (Map.Entry<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> e : errors.entrySet()) {
+ T4<Integer, TestCacheType, TestCacheType, TestCacheType> t = e.getKey();
+
+ sb.a("[cfgIdx=" + t.get1() + ", personCache=" + t.get2() + ", accCache=" + t.get3()
+ + ", orgCache=" + t.get4() + ", exception=" + e.getValue() + "]").a("\n");
+ }
+
+ sb.a("Successfully finished combinations:\n");
+
+ for (T4<Integer, TestCacheType, TestCacheType, TestCacheType> t : success) {
+ sb.a("[cfgIdx=" + t.get1() + ", personCache=" + t.get2() + ", accCache=" + t.get3()
+ + ", orgCache=" + t.get4() + "]").a("\n");
+ }
+
+ fail(sb.toString());
+ }
+ }
+
+ /**
+ * @param personCacheType Person cache type.
+ * @param accountCacheType Account cache type.
+ * @param orgCacheType Organization cache type.
+ */
+ private void checkDistributedCrossCacheJoin(final TestCacheType personCacheType,
+ final TestCacheType accountCacheType,
+ final TestCacheType orgCacheType) throws Exception {
+ info("Checking distributed cross cache join [personCache=" + personCacheType + ", accCache=" + accountCacheType
+ + ", orgCache=" + orgCacheType + "]");
+
+ Collection<TestCacheType> cacheTypes = new ArrayList<TestCacheType>() {{
+ add(personCacheType);
+ add(accountCacheType);
+ add(orgCacheType);
+ }};
+
+ for (TestCacheType type : cacheTypes) {
+ CacheConfiguration cc = cacheConfiguration(type.cacheName, type.cacheMode, type.backups);
+
+ ignite(0).getOrCreateCache(cc);
+
+ info("Created cache [name=" + type.cacheName + ", mode=" + type.cacheMode + "]");
+ }
+
+ awaitPartitionMapExchange();
+
+ try {
+ Data data = prepareData();
+
+ IgniteCache accCache = ignite(0).cache(accountCacheType.cacheName);
+
+ for (Account account : data.accounts)
+ accCache.put(account.id, account);
+
+ IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
+
+ for (Person person : data.persons)
+ personCache.put(person.id, person);
+
+ IgniteCache orgCache = ignite(0).cache(orgCacheType.cacheName);
+
+ for (Organization org : data.orgs)
+ orgCache.put(org.id, org);
+
+ for (int i = 0; i < NODES; i++) {
+ log.info("Test node: " + i);
+
+ checkPersonAccountsJoin(orgCache, data.accountsCntForPerson, accCache.getName(), personCache.getName());
+
+ checkOrganizationPersonsJoin(accCache, data.personsCntAtOrg,
+ orgCacheType.cacheName, personCacheType.cacheName);
+ }
+ }
+ finally {
+ ignite(0).destroyCache(accountCacheType.cacheName);
+ ignite(0).destroyCache(personCacheType.cacheName);
+ ignite(0).destroyCache(orgCacheType.cacheName);
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ for (int i = 0; i < NODES; i++) {
+ if (grid(i).cache(accountCacheType.cacheName) != null
+ || grid(i).cache(personCacheType.cacheName) != null
+ || grid(i).cache(orgCacheType.cacheName) != null)
+ return false;
+ }
+
+ return true;
+ }
+ }, 10_000));
+ }
+ }
+
+ /**
+ * Organization ids: [0, 9].
+ * Person ids: randoms at [10, 9999]
+ * Accounts ids: randoms at [10000, 999_999]
+ *
+ * @return Data.
+ */
+ private static Data prepareData() {
+ Map<Integer, Integer> personsCntAtOrg = new HashMap<>();
+ Map<Integer, Integer> accountsCntForPerson = new HashMap<>();
+
+ Collection<Organization> orgs = new ArrayList<>();
+ Collection<Person> persons = new ArrayList<>();
+ Collection<Account> accounts = new ArrayList<>();
+
+ final int ORG_CNT = 1;
+
+ for (int id = 0; id < ORG_CNT; id++)
+ orgs.add(new Organization(id, "org-" + id));
+
+ Set<Integer> personIds = new HashSet<>();
+ Set<Integer> accountIds = new HashSet<>();
+
+ for (int orgId = 0; orgId < ORG_CNT; orgId++) {
+ int personsCnt = ThreadLocalRandom.current().nextInt(100);
+
+ for (int p = 0; p < personsCnt; p++) {
+ int personId = ThreadLocalRandom.current().nextInt(10, 10_000);
+
+ while (!personIds.add(personId))
+ personId = ThreadLocalRandom.current().nextInt(10, 10_000);
+
+ String name = "person-" + personId;
+
+ persons.add(new Person(personId, orgId, name));
+
+ int accountsCnt = ThreadLocalRandom.current().nextInt(10);
+
+ for (int a = 0; a < accountsCnt; a++) {
+ int accountId = ThreadLocalRandom.current().nextInt(10_000, 1000_00);
+
+ while (!accountIds.add(accountId))
+ accountId = ThreadLocalRandom.current().nextInt(10_000, 1000_000);
+
+ accounts.add(new Account(accountId, personId));
+ }
+
+ accountsCntForPerson.put(personId, accountsCnt);
+ }
+
+ personsCntAtOrg.put(orgId, personsCnt);
+ }
+
+ return new Data(orgs, persons, accounts, personsCntAtOrg, accountsCntForPerson);
+
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String cacheName, CacheMode cacheMode, int backups) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(String.valueOf(cacheName));
+
+ ccfg.setCacheMode(cacheMode);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ QueryEntity account = new QueryEntity();
+ account.setKeyType(Integer.class.getName());
+ account.setValueType(Account.class.getName());
+ account.addQueryField("personId", Integer.class.getName(), null);
+
+ QueryEntity person = new QueryEntity();
+ person.setKeyType(Integer.class.getName());
+ person.setValueType(Person.class.getName());
+ person.addQueryField("orgId", Integer.class.getName(), null);
+ person.addQueryField("id", Integer.class.getName(), null);
+ person.addQueryField("name", String.class.getName(), null);
+
+ QueryEntity org = new QueryEntity();
+ org.setKeyType(Integer.class.getName());
+ org.setValueType(Organization.class.getName());
+ org.addQueryField("id", Integer.class.getName(), null);
+ org.addQueryField("name", String.class.getName(), null);
+
+ ccfg.setQueryEntities(F.asList(account, person, org));
+
+ return ccfg;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param cnts Organizations per person counts.
+ */
+ private void checkOrganizationPersonsJoin(IgniteCache cache, Map<Integer, Integer> cnts, String orgCacheName,
+ String personCacheName) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p.name " +
+ "from \"" + orgCacheName + "\".Organization o, \"" + personCacheName + "\".Person p " +
+ "where p.orgId = o._key and o._key=?");
+
+ qry.setDistributedJoins(true);
+
+ long total = 0;
+
+ for (int i = 0; i < cnts.size(); i++) {
+ qry.setArgs(i);
+
+ List<List<Object>> res = cache.query(qry).getAll();
+
+ assertEquals((int)cnts.get(i), res.size());
+
+ total += res.size();
+ }
+
+ SqlFieldsQuery qry2 = new SqlFieldsQuery("select count(*) " +
+ "from \"" + orgCacheName + "\".Organization o, \"" + personCacheName + "\".Person p where p.orgId = o._key");
+
+ qry2.setDistributedJoins(true);
+
+ List<List<Object>> res = cache.query(qry2).getAll();
+
+ assertEquals(1, res.size());
+ assertEquals(total, res.get(0).get(0));
+ }
+
+ /**
+ * @param cache Cache.
+ * @param cnts Accounts per person counts.
+ */
+ private void checkPersonAccountsJoin(IgniteCache cache, Map<Integer, Integer> cnts, String accCacheName,
+ String personCacheName) {
+ SqlFieldsQuery qry1 = new SqlFieldsQuery("select p.name " +
+ "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account a " +
+ "where p._key = a.personId and p._key=?");
+
+ qry1.setDistributedJoins(true);
+
+ SqlFieldsQuery qry2 = new SqlFieldsQuery("select p.name " +
+ "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account a " +
+ "where p.id = a.personId and p.id=?");
+
+ qry2.setDistributedJoins(true);
+
+ Ignite ignite = (Ignite)cache.unwrap(Ignite.class);
+
+ long total = 0;
+
+ for (Map.Entry<Integer, Integer> e : cnts.entrySet()) {
+ qry2.setArgs(e.getKey());
+
+ List<List<Object>> res = cache.query(qry2).getAll();
+
+ assertEquals((int)e.getValue(), res.size());
+
+ total += res.size();
+
+ qry2.setArgs(e.getKey());
+
+ res = cache.query(qry2).getAll();
+
+ assertEquals((int)e.getValue(), res.size());
+ }
+
+ SqlFieldsQuery[] qrys = new SqlFieldsQuery[2];
+
+ qrys[0] = new SqlFieldsQuery("select count(*) " +
+ "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account" + " a " +
+ "where p.id = a.personId");
+
+ qrys[1] = new SqlFieldsQuery("select count(*) " +
+ "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account" + " a " +
+ "where p._key = a.personId");
+
+ for (SqlFieldsQuery qry : qrys) {
+ qry.setDistributedJoins(true);
+
+ List<List<Object>> res = cache.query(qry).getAll();
+
+ assertEquals(1, res.size());
+ assertEquals(total, res.get(0).get(0));
+ }
+ }
+
+ /**
+ *
+ */
+ private enum TestCacheType {
+ /** */
+ REPLICATED_1(CacheMode.REPLICATED, 0),
+
+ /** */
+ REPLICATED_2(CacheMode.REPLICATED, 0),
+
+ /** */
+ REPLICATED_3(CacheMode.REPLICATED, 0),
+
+ /** */
+ PARTITIONED_b0_1(CacheMode.PARTITIONED, 0),
+
+ /** */
+ PARTITIONED_b0_2(CacheMode.PARTITIONED, 0),
+
+ /** */
+ PARTITIONED_b0_3(CacheMode.PARTITIONED, 0),
+
+ /** */
+ PARTITIONED_b1_1(CacheMode.PARTITIONED, 1),
+
+ /** */
+ PARTITIONED_b1_2(CacheMode.PARTITIONED, 1),
+
+ /** */
+ PARTITIONED_b1_3(CacheMode.PARTITIONED, 1),
+ ;
+
+ /** */
+ final String cacheName;
+
+ /** */
+ final CacheMode cacheMode;
+
+ /** */
+ final int backups;
+
+ /**
+ * @param mode Cache mode.
+ * @param backups Backups.
+ */
+ TestCacheType(CacheMode mode, int backups) {
+ cacheName = name();
+ cacheMode = mode;
+ this.backups = backups;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Data {
+ /** */
+ final Collection<Organization> orgs;
+
+ /** */
+ final Collection<Person> persons;
+
+ /** */
+ final Collection<Account> accounts;
+
+ /** */
+ final Map<Integer, Integer> personsCntAtOrg;
+
+ /** */
+ final Map<Integer, Integer> accountsCntForPerson;
+
+ /**
+ * @param orgs Organizations.
+ * @param persons Persons.
+ * @param accounts Accounts.
+ * @param personsCntAtOrg Count of persons at organization.
+ * @param accountsCntForPerson Count of accounts which have a person.
+ */
+ Data(Collection<Organization> orgs, Collection<Person> persons, Collection<Account> accounts,
+ Map<Integer, Integer> personsCntAtOrg, Map<Integer, Integer> accountsCntForPerson) {
+ this.orgs = orgs;
+ this.persons = persons;
+ this.accounts = accounts;
+ this.personsCntAtOrg = personsCntAtOrg;
+ this.accountsCntForPerson = accountsCntForPerson;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Account implements Serializable {
+ /** */
+ private int id;
+
+ /** */
+ @QuerySqlField
+ private int personId;
+
+ Account(int id, int personId) {
+ this.id = id;
+ this.personId = personId;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ int id;
+
+ /** */
+ @QuerySqlField
+ int orgId;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ public Person(int id, int orgId, String name) {
+ this.id = id;
+ this.orgId = orgId;
+ this.name = name;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Organization implements Serializable {
+ /** */
+ @QuerySqlField
+ int id;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ * @param id ID.
+ * @param name Name.
+ */
+ Organization(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71286d7c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 2133055..fe3f7c9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheQueryMultiThreaded
import org.apache.ignite.internal.processors.cache.IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheQueryOffheapMultiThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheSqlQueryMultiThreadedSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCrossCachesDistributedJoinQueryTest;
import org.apache.ignite.internal.processors.cache.SqlFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledFieldsQuerySelfTest;
@@ -249,6 +250,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheNoClassQuerySelfTest.class);
suite.addTestSuite(IgniteCacheJoinQueryTest.class);
+ suite.addTestSuite(IgniteCrossCachesDistributedJoinQueryTest.class);
// Other.
suite.addTestSuite(CacheQueryNewClientSelfTest.class);