You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/10 17:29:36 UTC

ignite git commit: ignite-1232: throw proper exception in case of executing query over REPLICATED cache and data node

Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 461b45983 -> 74484d786


ignite-1232: throw proper exception in case of executing query over REPLICATED cache and data node


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74484d78
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74484d78
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74484d78

Branch: refs/heads/ignite-1232
Commit: 74484d786977de142f2f58b7852267b600f4af7c
Parents: 461b459
Author: ashutak <as...@gridgain.com>
Authored: Thu Mar 10 19:30:54 2016 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Thu Mar 10 19:30:54 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |   8 ++
 .../cache/IgniteCacheJoinQueryTest.java         |  24 +++-
 ...niteCrossCachesDistributedJoinQueryTest.java | 135 +++++++++++++------
 3 files changed, 118 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/74484d78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 6e8bcbf..0145fa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -630,6 +630,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                     opCtxCall != null && opCtxCall.isKeepBinary());
 
             if (qry instanceof SqlQuery) {
+                if (isReplicatedDataNode() && ((SqlQuery)qry).isDistributedJoins())
+                    throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " +
+                        "not on replicated.");
+
                 final SqlQuery p = (SqlQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
@@ -644,6 +648,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             }
 
             if (qry instanceof SqlFieldsQuery) {
+                if (isReplicatedDataNode() && ((SqlFieldsQuery)qry).isDistributedJoins())
+                    throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " +
+                        "not on replicated.");
+
                 SqlFieldsQuery p = (SqlFieldsQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/74484d78/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
index 541d54c..1dc2bed 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
@@ -23,7 +23,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheKeyConfiguration;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -134,7 +137,7 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
      * @param affKey If {@code true} uses key with affinity key field.
      * @param includeAffKey If {@code true} includes affinity key field in query fields.
      */
-    public void testJoinQuery(CacheMode cacheMode, int backups, boolean affKey, boolean includeAffKey) {
+    public void testJoinQuery(CacheMode cacheMode, int backups, final boolean affKey, boolean includeAffKey) {
         CacheConfiguration ccfg = cacheConfiguration(cacheMode, backups, affKey, includeAffKey);
 
         log.info("Test cache [mode=" + cacheMode + ", backups=" + backups + ']');
@@ -142,16 +145,27 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
         IgniteCache cache = ignite(0).createCache(ccfg);
 
         try {
-            PutData putData = putData(cache, affKey);
+            final PutData putData = putData(cache, affKey);
 
             for (int i = 0; i < NODES; i++) {
                 log.info("Test node: " + i);
 
-                IgniteCache cache0 = ignite(i).cache(ccfg.getName());
+                final IgniteCache cache0 = ignite(i).cache(ccfg.getName());
 
-                checkPersonAccountsJoin(cache0, putData.personAccounts, affKey);
+                if (cacheMode == REPLICATED && !ignite(i).configuration().isClientMode()) {
+                    GridTestUtils.assertThrows(log, new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            checkPersonAccountsJoin(cache0, putData.personAccounts, affKey);
 
-                checkOrganizationPersonsJoin(cache0, putData.orgPersons);
+                            return null;
+                        }
+                    }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+                }
+                else {
+                    checkPersonAccountsJoin(cache0, putData.personAccounts, affKey);
+
+                    checkOrganizationPersonsJoin(cache0, putData.orgPersons);
+                }
             }
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/74484d78/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
index df5c0fd..d9b4d05 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
@@ -28,7 +28,9 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
@@ -43,9 +45,11 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -66,7 +70,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     private Data data;
 
     /** */
-    private String dataAsString;
+    private String dataAsStr;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -204,15 +208,15 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
         awaitPartitionMapExchange();
 
         try {
-            dataAsString = null;
+            dataAsStr = null;
             data = prepareData();
 
-            IgniteCache accCache = ignite(0).cache(accCacheType.cacheName);
+            final IgniteCache accCache = ignite(0).cache(accCacheType.cacheName);
 
             for (Account account : data.accounts)
                 accCache.put(account.id, account);
 
-            IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
+            final IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
 
             for (Person person : data.persons)
                 personCache.put(person.id, person);
@@ -232,41 +236,77 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
                 log.info("Test node: " + i);
 
                 for (String cacheName : cacheNames) {
-                    IgniteCache cache = ignite(i).cache(cacheName);
+                    final IgniteCache cache = ignite(i).cache(cacheName);
 
                     log.info("Use cache: " + cache.getName());
 
-                    checkPersonAccountsJoin(cache,
-                        data.accountsPerPerson,
-                        accCache.getName(),
-                        personCache.getName());
-
-                    checkOrganizationPersonsJoin(cache,
-                        data.personsPerOrg,
-                        orgCacheType.cacheName,
-                        personCacheType.cacheName);
-
-                    checkOrganizationPersonAccountJoin(cache,
-                        data.accountsPerOrg,
-                        orgCacheType.cacheName,
-                        personCacheType.cacheName,
-                        accCacheType.cacheName);
-
-                    checkUninon(cache,
-                        data,
-                        orgCacheType.cacheName,
-                        personCacheType.cacheName,
-                        accCacheType.cacheName);
-
-                    checkCrossJoin(cache,
-                        data,
-                        personCacheType.cacheName,
-                        accCacheType.cacheName);
-
-                    checkGroupBy(cache,
-                        personCacheType.cacheName,
-                        accCacheType.cacheName,
-                        orgCacheType.cacheName);
+                    CacheConfiguration cc = (CacheConfiguration)cache.getConfiguration(CacheConfiguration.class);
+
+                    if (cc.getCacheMode() == REPLICATED && !ignite(i).configuration().isClientMode()) {
+                        GridTestUtils.assertThrows(log, new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                cache.query(new SqlFieldsQuery("select p.name from " +
+                                    "\"" + personCache.getName() + "\".Person p, " +
+                                    "\"" + accCache.getName() + "\".Account a " +
+                                    "where p._key = a.personId").setDistributedJoins(true));
+
+                                return null;
+                            }
+                        }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+
+                        GridTestUtils.assertThrows(log, new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                cache.query(new SqlQuery(Person.class,
+                                    "from \"" + personCache.getName() + "\".Person , " +
+                                        "\"" + accCache.getName() + "\".Account  " +
+                                        "where Person._key = Account.personId")
+                                    .setDistributedJoins(true));
+
+                                return null;
+                            }
+                        }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+                    }
+                    else {
+                        if (!cache.getName().equals(orgCacheType.cacheName))
+                            checkPersonAccountsJoin(cache,
+                                data.accountsPerPerson,
+                                accCache.getName(),
+                                personCache.getName());
+
+                        if (!cache.getName().equals(accCacheType.cacheName))
+                            checkOrganizationPersonsJoin(cache,
+                                data.personsPerOrg,
+                                orgCacheType.cacheName,
+                                personCacheType.cacheName);
+
+                        checkOrganizationPersonAccountJoin(cache,
+                            data.accountsPerOrg,
+                            orgCacheType.cacheName,
+                            personCacheType.cacheName,
+                            accCacheType.cacheName);
+
+                        checkUninon(cache,
+                            data,
+                            orgCacheType.cacheName,
+                            personCacheType.cacheName,
+                            accCacheType.cacheName);
+
+                        if (!cache.getName().equals(orgCacheType.cacheName))
+                            checkPersonAccountCrossJoin(cache,
+                                data,
+                                personCacheType.cacheName,
+                                accCacheType.cacheName);
+
+                        if (!cache.getName().equals(accCacheType.cacheName))
+                            checkPersonOrganizationGroupBy(cache,
+                                personCacheType.cacheName,
+                                orgCacheType.cacheName);
+
+                        if (!cache.getName().equals(orgCacheType.cacheName))
+                            checkPersonAccountGroupBy(cache,
+                                personCacheType.cacheName,
+                                accCacheType.cacheName);
+                    }
                 }
             }
         }
@@ -728,7 +768,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
      * @param personCacheName Person cache name.
      * @param accCacheName Account cache name.
      */
-    private void checkCrossJoin(IgniteCache cache, Data data,
+    private void checkPersonAccountCrossJoin(IgniteCache cache, Data data,
         String personCacheName, String accCacheName) {
         SqlFieldsQuery q = new SqlFieldsQuery("select p.name " +
             "from \"" + personCacheName + "\".Person p " +
@@ -744,11 +784,10 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
     /**
      * @param cache Cache.
      * @param personCacheName Person cache name.
-     * @param accCacheName Account cache name.
      * @param orgCacheName Organization cache name.
      */
-    private void checkGroupBy(IgniteCache cache,
-        String personCacheName, String accCacheName, String orgCacheName) {
+    private void checkPersonOrganizationGroupBy(IgniteCache cache,
+        String personCacheName, String orgCacheName) {
         // Max salary per organization.
         SqlFieldsQuery q = new SqlFieldsQuery("select max(p.salary) " +
             "from \"" + personCacheName + "\".Person p join \""+orgCacheName+"\".Organization o " +
@@ -777,9 +816,17 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
             else
                 assertEquals(errMsg, 0, res.size());
         }
+    }
 
+    /**
+     * @param cache Cache.
+     * @param personCacheName Person cache name.
+     * @param accCacheName Account cache name.
+     */
+    private void checkPersonAccountGroupBy(IgniteCache cache,
+        String personCacheName, String accCacheName) {
         // Count accounts per person.
-        q = new SqlFieldsQuery("select count(a.id) " +
+        SqlFieldsQuery q = new SqlFieldsQuery("select count(a.id) " +
             "from \"" + personCacheName + "\".Person p join \""+accCacheName+"\".Account a " +
             "on p.strId = a.personStrId " +
             "group by p.name " +
@@ -812,10 +859,10 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
      * @return Data as string.
      */
     private String dataAsString() {
-        if (dataAsString == null)
-            dataAsString = data.toString();
+        if (dataAsStr == null)
+            dataAsStr = data.toString();
 
-        return dataAsString;
+        return dataAsStr;
     }
 
     /**