You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/01 19:46:09 UTC

ignite git commit: ignite-1231: added IgniteCrossCachesDistributedJoinQueryTest

Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 95ef93f60 -> 71286d7c3


ignite-1231: added IgniteCrossCachesDistributedJoinQueryTest


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/71286d7c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/71286d7c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/71286d7c

Branch: refs/heads/ignite-1232
Commit: 71286d7c37ab23169614e061d390e8d2923f9ff7
Parents: 95ef93f
Author: ashutak <as...@gridgain.com>
Authored: Tue Mar 1 21:44:39 2016 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Tue Mar 1 21:44:39 2016 +0300

----------------------------------------------------------------------
 ...niteCrossCachesDistributedJoinQueryTest.java | 579 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 2 files changed, 581 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/71286d7c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
new file mode 100644
index 0000000..648a4c7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T4;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheDistributedJoin() throws Exception {
+        Map<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> errors = new LinkedHashMap<>();
+        List<T4<Integer, TestCacheType, TestCacheType, TestCacheType>> success = new ArrayList<>();
+
+        int cfgIdx = 0;
+
+        final Set<TestCacheType> personCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            add(TestCacheType.REPLICATED_1);
+            add(TestCacheType.PARTITIONED_b0_1);
+            add(TestCacheType.PARTITIONED_b1_1);
+        }};
+
+        final Set<TestCacheType> accCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            addAll(personCacheTypes);
+            add(TestCacheType.REPLICATED_2);
+            add(TestCacheType.PARTITIONED_b0_2);
+            add(TestCacheType.PARTITIONED_b1_2);
+        }};
+
+        Set<TestCacheType> orgCacheTypes = new LinkedHashSet<TestCacheType>() {{
+            addAll(accCacheTypes);
+            add(TestCacheType.REPLICATED_3);
+            add(TestCacheType.PARTITIONED_b0_3);
+            add(TestCacheType.PARTITIONED_b1_3);
+        }};
+
+        for (TestCacheType personCacheType : personCacheTypes) {
+            for (TestCacheType accCacheType : accCacheTypes) {
+                for (TestCacheType orgCacheType : orgCacheTypes) {
+                    try {
+                        checkDistributedCrossCacheJoin(personCacheType, accCacheType, orgCacheType);
+
+                        success.add(new T4<>(cfgIdx, personCacheType, accCacheType, orgCacheType));
+                    }
+                    catch (Throwable e) {
+                        error("Failed to make distributed cross cache select.", e);
+
+                        errors.put(new T4<>(cfgIdx, personCacheType, accCacheType, orgCacheType), e);
+                    }
+
+                    cfgIdx++;
+                }
+            }
+        }
+
+        if (!errors.isEmpty()) {
+            int total = personCacheTypes.size() * accCacheTypes.size() * orgCacheTypes.size();
+
+            SB sb = new SB("Test failed for the following " + errors.size() + " combination(s) ("+total+" total):\n");
+
+            for (Map.Entry<T4<Integer, TestCacheType, TestCacheType, TestCacheType>, Throwable> e : errors.entrySet()) {
+                T4<Integer, TestCacheType, TestCacheType, TestCacheType> t = e.getKey();
+
+                sb.a("[cfgIdx=" + t.get1() + ", personCache=" + t.get2() + ", accCache=" + t.get3()
+                    + ", orgCache=" + t.get4() + ", exception=" + e.getValue() + "]").a("\n");
+            }
+
+            sb.a("Successfully finished combinations:\n");
+
+            for (T4<Integer, TestCacheType, TestCacheType, TestCacheType> t : success) {
+                sb.a("[cfgIdx=" + t.get1() + ", personCache=" + t.get2() + ", accCache=" + t.get3()
+                    + ", orgCache=" + t.get4() + "]").a("\n");
+            }
+
+            fail(sb.toString());
+        }
+    }
+
+    /**
+     * @param personCacheType Person cache type.
+     * @param accountCacheType Account cache type.
+     * @param orgCacheType Organization cache type.
+     */
+    private void checkDistributedCrossCacheJoin(final TestCacheType personCacheType,
+        final TestCacheType accountCacheType,
+        final TestCacheType orgCacheType) throws Exception {
+        info("Checking distributed cross cache join [personCache=" + personCacheType + ", accCache=" + accountCacheType
+            + ", orgCache=" + orgCacheType + "]");
+
+        Collection<TestCacheType> cacheTypes = new ArrayList<TestCacheType>() {{
+            add(personCacheType);
+            add(accountCacheType);
+            add(orgCacheType);
+        }};
+
+        for (TestCacheType type : cacheTypes) {
+            CacheConfiguration cc = cacheConfiguration(type.cacheName, type.cacheMode, type.backups);
+
+            ignite(0).getOrCreateCache(cc);
+
+            info("Created cache [name=" + type.cacheName + ", mode=" + type.cacheMode + "]");
+        }
+
+        awaitPartitionMapExchange();
+
+        try {
+            Data data = prepareData();
+
+            IgniteCache accCache = ignite(0).cache(accountCacheType.cacheName);
+
+            for (Account account : data.accounts)
+                accCache.put(account.id, account);
+
+            IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
+
+            for (Person person : data.persons)
+                personCache.put(person.id, person);
+
+            IgniteCache orgCache = ignite(0).cache(orgCacheType.cacheName);
+
+            for (Organization org : data.orgs)
+                orgCache.put(org.id, org);
+
+            for (int i = 0; i < NODES; i++) {
+                log.info("Test node: " + i);
+
+                checkPersonAccountsJoin(orgCache, data.accountsCntForPerson, accCache.getName(), personCache.getName());
+
+                checkOrganizationPersonsJoin(accCache, data.personsCntAtOrg,
+                    orgCacheType.cacheName, personCacheType.cacheName);
+            }
+        }
+        finally {
+            ignite(0).destroyCache(accountCacheType.cacheName);
+            ignite(0).destroyCache(personCacheType.cacheName);
+            ignite(0).destroyCache(orgCacheType.cacheName);
+
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+                @Override public boolean applyx() throws IgniteCheckedException {
+                    for (int i = 0; i < NODES; i++) {
+                        if (grid(i).cache(accountCacheType.cacheName) != null
+                            || grid(i).cache(personCacheType.cacheName) != null
+                            || grid(i).cache(orgCacheType.cacheName) != null)
+                            return false;
+                    }
+
+                    return true;
+                }
+            }, 10_000));
+        }
+    }
+
+    /**
+     * Organization ids: [0, 9].
+     * Person ids: randoms at [10, 9999]
+     * Accounts ids: randoms at [10000, 999_999]
+     *
+     * @return Data.
+     */
+    private static Data prepareData() {
+        Map<Integer, Integer> personsCntAtOrg = new HashMap<>();
+        Map<Integer, Integer> accountsCntForPerson = new HashMap<>();
+
+        Collection<Organization> orgs = new ArrayList<>();
+        Collection<Person> persons = new ArrayList<>();
+        Collection<Account> accounts = new ArrayList<>();
+
+        final int ORG_CNT = 1;
+
+        for (int id = 0; id < ORG_CNT; id++)
+            orgs.add(new Organization(id, "org-" + id));
+
+        Set<Integer> personIds = new HashSet<>();
+        Set<Integer> accountIds = new HashSet<>();
+
+        for (int orgId = 0; orgId < ORG_CNT; orgId++) {
+            int personsCnt = ThreadLocalRandom.current().nextInt(100);
+
+            for (int p = 0; p < personsCnt; p++) {
+                int personId = ThreadLocalRandom.current().nextInt(10, 10_000);
+
+                while (!personIds.add(personId))
+                    personId = ThreadLocalRandom.current().nextInt(10, 10_000);
+
+                String name = "person-" + personId;
+
+                persons.add(new Person(personId, orgId, name));
+
+                int accountsCnt = ThreadLocalRandom.current().nextInt(10);
+
+                for (int a = 0; a < accountsCnt; a++) {
+                    int accountId = ThreadLocalRandom.current().nextInt(10_000, 1000_00);
+
+                    while (!accountIds.add(accountId))
+                        accountId = ThreadLocalRandom.current().nextInt(10_000, 1000_000);
+
+                    accounts.add(new Account(accountId, personId));
+                }
+
+                accountsCntForPerson.put(personId, accountsCnt);
+            }
+
+            personsCntAtOrg.put(orgId, personsCnt);
+        }
+
+        return new Data(orgs, persons, accounts, personsCntAtOrg, accountsCntForPerson);
+
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName, CacheMode cacheMode, int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(String.valueOf(cacheName));
+
+        ccfg.setCacheMode(cacheMode);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        QueryEntity account = new QueryEntity();
+        account.setKeyType(Integer.class.getName());
+        account.setValueType(Account.class.getName());
+        account.addQueryField("personId", Integer.class.getName(), null);
+
+        QueryEntity person = new QueryEntity();
+        person.setKeyType(Integer.class.getName());
+        person.setValueType(Person.class.getName());
+        person.addQueryField("orgId", Integer.class.getName(), null);
+        person.addQueryField("id", Integer.class.getName(), null);
+        person.addQueryField("name", String.class.getName(), null);
+
+        QueryEntity org = new QueryEntity();
+        org.setKeyType(Integer.class.getName());
+        org.setValueType(Organization.class.getName());
+        org.addQueryField("id", Integer.class.getName(), null);
+        org.addQueryField("name", String.class.getName(), null);
+
+        ccfg.setQueryEntities(F.asList(account, person, org));
+
+        return ccfg;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnts Organizations per person counts.
+     */
+    private void checkOrganizationPersonsJoin(IgniteCache cache, Map<Integer, Integer> cnts, String orgCacheName,
+        String personCacheName) {
+        SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p.name " +
+            "from \"" + orgCacheName + "\".Organization o, \"" + personCacheName + "\".Person p " +
+            "where p.orgId = o._key and o._key=?");
+
+        qry.setDistributedJoins(true);
+
+        long total = 0;
+
+        for (int i = 0; i < cnts.size(); i++) {
+            qry.setArgs(i);
+
+            List<List<Object>> res = cache.query(qry).getAll();
+
+            assertEquals((int)cnts.get(i), res.size());
+
+            total += res.size();
+        }
+
+        SqlFieldsQuery qry2 = new SqlFieldsQuery("select count(*) " +
+            "from \"" + orgCacheName + "\".Organization o, \"" + personCacheName + "\".Person p where p.orgId = o._key");
+
+        qry2.setDistributedJoins(true);
+
+        List<List<Object>> res = cache.query(qry2).getAll();
+
+        assertEquals(1, res.size());
+        assertEquals(total, res.get(0).get(0));
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnts Accounts per person counts.
+     */
+    private void checkPersonAccountsJoin(IgniteCache cache, Map<Integer, Integer> cnts, String accCacheName,
+        String personCacheName) {
+        SqlFieldsQuery qry1 = new SqlFieldsQuery("select p.name " +
+            "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account a " +
+            "where p._key = a.personId and p._key=?");
+
+        qry1.setDistributedJoins(true);
+
+        SqlFieldsQuery qry2 = new SqlFieldsQuery("select p.name " +
+            "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account a " +
+            "where p.id = a.personId and p.id=?");
+
+        qry2.setDistributedJoins(true);
+
+        Ignite ignite = (Ignite)cache.unwrap(Ignite.class);
+
+        long total = 0;
+
+        for (Map.Entry<Integer, Integer> e : cnts.entrySet()) {
+            qry2.setArgs(e.getKey());
+
+            List<List<Object>> res = cache.query(qry2).getAll();
+
+            assertEquals((int)e.getValue(), res.size());
+
+            total += res.size();
+
+            qry2.setArgs(e.getKey());
+
+            res = cache.query(qry2).getAll();
+
+            assertEquals((int)e.getValue(), res.size());
+        }
+
+        SqlFieldsQuery[] qrys = new SqlFieldsQuery[2];
+
+        qrys[0] = new SqlFieldsQuery("select count(*) " +
+            "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account" + " a " +
+            "where p.id = a.personId");
+
+        qrys[1] = new SqlFieldsQuery("select count(*) " +
+            "from \"" + personCacheName + "\".Person p, \"" + accCacheName + "\".Account" + " a " +
+            "where p._key = a.personId");
+
+        for (SqlFieldsQuery qry : qrys) {
+            qry.setDistributedJoins(true);
+
+            List<List<Object>> res = cache.query(qry).getAll();
+
+            assertEquals(1, res.size());
+            assertEquals(total, res.get(0).get(0));
+        }
+    }
+
+    /**
+     *
+     */
+    private enum TestCacheType {
+        /** */
+        REPLICATED_1(CacheMode.REPLICATED, 0),
+
+        /** */
+        REPLICATED_2(CacheMode.REPLICATED, 0),
+
+        /** */
+        REPLICATED_3(CacheMode.REPLICATED, 0),
+
+        /** */
+        PARTITIONED_b0_1(CacheMode.PARTITIONED, 0),
+
+        /** */
+        PARTITIONED_b0_2(CacheMode.PARTITIONED, 0),
+
+        /** */
+        PARTITIONED_b0_3(CacheMode.PARTITIONED, 0),
+
+        /** */
+        PARTITIONED_b1_1(CacheMode.PARTITIONED, 1),
+
+        /** */
+        PARTITIONED_b1_2(CacheMode.PARTITIONED, 1),
+
+        /** */
+        PARTITIONED_b1_3(CacheMode.PARTITIONED, 1),
+        ;
+
+        /** */
+        final String cacheName;
+
+        /** */
+        final CacheMode cacheMode;
+
+        /** */
+        final int backups;
+
+        /**
+         * @param mode Cache mode.
+         * @param backups Backups.
+         */
+        TestCacheType(CacheMode mode, int backups) {
+            cacheName = name();
+            cacheMode = mode;
+            this.backups = backups;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Data {
+        /** */
+        final Collection<Organization> orgs;
+
+        /** */
+        final Collection<Person> persons;
+
+        /** */
+        final Collection<Account> accounts;
+
+        /** */
+        final Map<Integer, Integer> personsCntAtOrg;
+
+        /** */
+        final Map<Integer, Integer> accountsCntForPerson;
+
+        /**
+         * @param orgs Organizations.
+         * @param persons Persons.
+         * @param accounts Accounts.
+         * @param personsCntAtOrg Count of persons at organization.
+         * @param accountsCntForPerson Count of accounts which have a person.
+         */
+        Data(Collection<Organization> orgs, Collection<Person> persons, Collection<Account> accounts,
+            Map<Integer, Integer> personsCntAtOrg, Map<Integer, Integer> accountsCntForPerson) {
+            this.orgs = orgs;
+            this.persons = persons;
+            this.accounts = accounts;
+            this.personsCntAtOrg = personsCntAtOrg;
+            this.accountsCntForPerson = accountsCntForPerson;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Account implements Serializable {
+        /** */
+        private int id;
+
+        /** */
+        @QuerySqlField
+        private int personId;
+
+        Account(int id, int personId) {
+            this.id = id;
+            this.personId = personId;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int id;
+
+        /** */
+        @QuerySqlField
+        int orgId;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        public Person(int id, int orgId, String name) {
+            this.id = id;
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        @QuerySqlField
+        int id;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         * @param id ID.
+         * @param name Name.
+         */
+        Organization(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/71286d7c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 2133055..fe3f7c9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheQueryMultiThreaded
 import org.apache.ignite.internal.processors.cache.IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheQueryOffheapMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheSqlQueryMultiThreadedSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCrossCachesDistributedJoinQueryTest;
 import org.apache.ignite.internal.processors.cache.SqlFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledFieldsQuerySelfTest;
@@ -249,6 +250,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheNoClassQuerySelfTest.class);
 
         suite.addTestSuite(IgniteCacheJoinQueryTest.class);
+        suite.addTestSuite(IgniteCrossCachesDistributedJoinQueryTest.class);
 
         // Other.
         suite.addTestSuite(CacheQueryNewClientSelfTest.class);