You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/10 17:29:36 UTC
ignite git commit: ignite-1232: throw proper exception in case of
executing query over REPLICATED cache and data node
Repository: ignite
Updated Branches:
refs/heads/ignite-1232 461b45983 -> 74484d786
ignite-1232: throw proper exception in case of executing query over REPLICATED cache and data node
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74484d78
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74484d78
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74484d78
Branch: refs/heads/ignite-1232
Commit: 74484d786977de142f2f58b7852267b600f4af7c
Parents: 461b459
Author: ashutak <as...@gridgain.com>
Authored: Thu Mar 10 19:30:54 2016 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Thu Mar 10 19:30:54 2016 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 8 ++
.../cache/IgniteCacheJoinQueryTest.java | 24 +++-
...niteCrossCachesDistributedJoinQueryTest.java | 135 +++++++++++++------
3 files changed, 118 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/74484d78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 6e8bcbf..0145fa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -630,6 +630,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
opCtxCall != null && opCtxCall.isKeepBinary());
if (qry instanceof SqlQuery) {
+ if (isReplicatedDataNode() && ((SqlQuery)qry).isDistributedJoins())
+ throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " +
+ "not on replicated.");
+
final SqlQuery p = (SqlQuery)qry;
if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
@@ -644,6 +648,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
if (qry instanceof SqlFieldsQuery) {
+ if (isReplicatedDataNode() && ((SqlFieldsQuery)qry).isDistributedJoins())
+ throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " +
+ "not on replicated.");
+
SqlFieldsQuery p = (SqlFieldsQuery)qry;
if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
http://git-wip-us.apache.org/repos/asf/ignite/blob/74484d78/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
index 541d54c..1dc2bed 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
@@ -23,7 +23,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheKeyConfiguration;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -134,7 +137,7 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
* @param affKey If {@code true} uses key with affinity key field.
* @param includeAffKey If {@code true} includes affinity key field in query fields.
*/
- public void testJoinQuery(CacheMode cacheMode, int backups, boolean affKey, boolean includeAffKey) {
+ public void testJoinQuery(CacheMode cacheMode, int backups, final boolean affKey, boolean includeAffKey) {
CacheConfiguration ccfg = cacheConfiguration(cacheMode, backups, affKey, includeAffKey);
log.info("Test cache [mode=" + cacheMode + ", backups=" + backups + ']');
@@ -142,16 +145,27 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
IgniteCache cache = ignite(0).createCache(ccfg);
try {
- PutData putData = putData(cache, affKey);
+ final PutData putData = putData(cache, affKey);
for (int i = 0; i < NODES; i++) {
log.info("Test node: " + i);
- IgniteCache cache0 = ignite(i).cache(ccfg.getName());
+ final IgniteCache cache0 = ignite(i).cache(ccfg.getName());
- checkPersonAccountsJoin(cache0, putData.personAccounts, affKey);
+ if (cacheMode == REPLICATED && !ignite(i).configuration().isClientMode()) {
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ checkPersonAccountsJoin(cache0, putData.personAccounts, affKey);
- checkOrganizationPersonsJoin(cache0, putData.orgPersons);
+ return null;
+ }
+ }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+ }
+ else {
+ checkPersonAccountsJoin(cache0, putData.personAccounts, affKey);
+
+ checkOrganizationPersonsJoin(cache0, putData.orgPersons);
+ }
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/74484d78/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
index df5c0fd..d9b4d05 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCrossCachesDistributedJoinQueryTest.java
@@ -28,7 +28,9 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
@@ -43,9 +45,11 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
@@ -66,7 +70,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
private Data data;
/** */
- private String dataAsString;
+ private String dataAsStr;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -204,15 +208,15 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
awaitPartitionMapExchange();
try {
- dataAsString = null;
+ dataAsStr = null;
data = prepareData();
- IgniteCache accCache = ignite(0).cache(accCacheType.cacheName);
+ final IgniteCache accCache = ignite(0).cache(accCacheType.cacheName);
for (Account account : data.accounts)
accCache.put(account.id, account);
- IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
+ final IgniteCache personCache = ignite(0).cache(personCacheType.cacheName);
for (Person person : data.persons)
personCache.put(person.id, person);
@@ -232,41 +236,77 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
log.info("Test node: " + i);
for (String cacheName : cacheNames) {
- IgniteCache cache = ignite(i).cache(cacheName);
+ final IgniteCache cache = ignite(i).cache(cacheName);
log.info("Use cache: " + cache.getName());
- checkPersonAccountsJoin(cache,
- data.accountsPerPerson,
- accCache.getName(),
- personCache.getName());
-
- checkOrganizationPersonsJoin(cache,
- data.personsPerOrg,
- orgCacheType.cacheName,
- personCacheType.cacheName);
-
- checkOrganizationPersonAccountJoin(cache,
- data.accountsPerOrg,
- orgCacheType.cacheName,
- personCacheType.cacheName,
- accCacheType.cacheName);
-
- checkUninon(cache,
- data,
- orgCacheType.cacheName,
- personCacheType.cacheName,
- accCacheType.cacheName);
-
- checkCrossJoin(cache,
- data,
- personCacheType.cacheName,
- accCacheType.cacheName);
-
- checkGroupBy(cache,
- personCacheType.cacheName,
- accCacheType.cacheName,
- orgCacheType.cacheName);
+ CacheConfiguration cc = (CacheConfiguration)cache.getConfiguration(CacheConfiguration.class);
+
+ if (cc.getCacheMode() == REPLICATED && !ignite(i).configuration().isClientMode()) {
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlFieldsQuery("select p.name from " +
+ "\"" + personCache.getName() + "\".Person p, " +
+ "\"" + accCache.getName() + "\".Account a " +
+ "where p._key = a.personId").setDistributedJoins(true));
+
+ return null;
+ }
+ }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlQuery(Person.class,
+ "from \"" + personCache.getName() + "\".Person , " +
+ "\"" + accCache.getName() + "\".Account " +
+ "where Person._key = Account.personId")
+ .setDistributedJoins(true));
+
+ return null;
+ }
+ }, CacheException.class, "Queries using distributed JOINs have to be run on partitioned cache");
+ }
+ else {
+ if (!cache.getName().equals(orgCacheType.cacheName))
+ checkPersonAccountsJoin(cache,
+ data.accountsPerPerson,
+ accCache.getName(),
+ personCache.getName());
+
+ if (!cache.getName().equals(accCacheType.cacheName))
+ checkOrganizationPersonsJoin(cache,
+ data.personsPerOrg,
+ orgCacheType.cacheName,
+ personCacheType.cacheName);
+
+ checkOrganizationPersonAccountJoin(cache,
+ data.accountsPerOrg,
+ orgCacheType.cacheName,
+ personCacheType.cacheName,
+ accCacheType.cacheName);
+
+ checkUninon(cache,
+ data,
+ orgCacheType.cacheName,
+ personCacheType.cacheName,
+ accCacheType.cacheName);
+
+ if (!cache.getName().equals(orgCacheType.cacheName))
+ checkPersonAccountCrossJoin(cache,
+ data,
+ personCacheType.cacheName,
+ accCacheType.cacheName);
+
+ if (!cache.getName().equals(accCacheType.cacheName))
+ checkPersonOrganizationGroupBy(cache,
+ personCacheType.cacheName,
+ orgCacheType.cacheName);
+
+ if (!cache.getName().equals(orgCacheType.cacheName))
+ checkPersonAccountGroupBy(cache,
+ personCacheType.cacheName,
+ accCacheType.cacheName);
+ }
}
}
}
@@ -728,7 +768,7 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
* @param personCacheName Person cache name.
* @param accCacheName Account cache name.
*/
- private void checkCrossJoin(IgniteCache cache, Data data,
+ private void checkPersonAccountCrossJoin(IgniteCache cache, Data data,
String personCacheName, String accCacheName) {
SqlFieldsQuery q = new SqlFieldsQuery("select p.name " +
"from \"" + personCacheName + "\".Person p " +
@@ -744,11 +784,10 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
/**
* @param cache Cache.
* @param personCacheName Person cache name.
- * @param accCacheName Account cache name.
* @param orgCacheName Organization cache name.
*/
- private void checkGroupBy(IgniteCache cache,
- String personCacheName, String accCacheName, String orgCacheName) {
+ private void checkPersonOrganizationGroupBy(IgniteCache cache,
+ String personCacheName, String orgCacheName) {
// Max salary per organization.
SqlFieldsQuery q = new SqlFieldsQuery("select max(p.salary) " +
"from \"" + personCacheName + "\".Person p join \""+orgCacheName+"\".Organization o " +
@@ -777,9 +816,17 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
else
assertEquals(errMsg, 0, res.size());
}
+ }
+ /**
+ * @param cache Cache.
+ * @param personCacheName Person cache name.
+ * @param accCacheName Account cache name.
+ */
+ private void checkPersonAccountGroupBy(IgniteCache cache,
+ String personCacheName, String accCacheName) {
// Count accounts per person.
- q = new SqlFieldsQuery("select count(a.id) " +
+ SqlFieldsQuery q = new SqlFieldsQuery("select count(a.id) " +
"from \"" + personCacheName + "\".Person p join \""+accCacheName+"\".Account a " +
"on p.strId = a.personStrId " +
"group by p.name " +
@@ -812,10 +859,10 @@ public class IgniteCrossCachesDistributedJoinQueryTest extends GridCommonAbstrac
* @return Data as string.
*/
private String dataAsString() {
- if (dataAsString == null)
- dataAsString = data.toString();
+ if (dataAsStr == null)
+ dataAsStr = data.toString();
- return dataAsString;
+ return dataAsStr;
}
/**