You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/19 10:46:46 UTC
ignite git commit: ignite-1232 Avoid error on cache start if affinity
key is not part of table
Repository: ignite
Updated Branches:
refs/heads/ignite-1232 080d50391 -> 7fab7051a
ignite-1232 Avoid error on cache start if affinity key is not part of table
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fab7051
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fab7051
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fab7051
Branch: refs/heads/ignite-1232
Commit: 7fab7051a6cabe8b8d3b4a09776232444d7f2882
Parents: 080d503
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 19 12:46:39 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 19 12:46:39 2016 +0300
----------------------------------------------------------------------
.../processors/query/h2/IgniteH2Indexing.java | 13 +
.../processors/query/h2/opt/GridH2Table.java | 34 ++-
.../cache/IgniteCacheJoinQueryTest.java | 303 +++++++++++++++++--
.../IgniteCacheQuerySelfTestSuite.java | 3 +
4 files changed, 325 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cb41250..1215141 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2;
+import java.util.Objects;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -1130,6 +1131,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
twoStepQry.spaces(spaces);
}
+ if (twoStepQry.distributedJoins()) {
+ for (String table : twoStepQry.tables()) {
+ GridH2Table tbl = dataTable(table);
+
+ Objects.requireNonNull(tbl, table);
+
+ if (!tbl.affinityColumnExists())
+ throw new CacheException("Failed to run distributed join query, " +
+ "affinity key is not included in table: " + table);
+ }
+ }
+
meta = meta(stmt.getMetaData());
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index f31f091..b4a056c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -96,6 +96,9 @@ public class GridH2Table extends TableBase {
/** */
private final boolean snapshotEnabled;
+ /** */
+ private final boolean affinityColExists;
+
/**
* Creates table.
*
@@ -113,17 +116,33 @@ public class GridH2Table extends TableBase {
this.desc = desc;
this.spaceName = spaceName;
+ boolean affinityColExists0 = true;
+
if (desc != null && desc.context() != null) {
String affKey = desc.type().affinityKey();
- int affKeyColId = affKey == null ? KEY_COL : getColumn(desc.context().config().isSqlEscapeAll() ?
- affKey : affKey.toUpperCase()).getColumnId();
+ int affKeyColId = -1;
+
+ if (affKey != null) {
+ String colName = desc.context().config().isSqlEscapeAll() ? affKey : affKey.toUpperCase();
+
+ if (doesColumnExist(colName))
+ affKeyColId = getColumn(colName).getColumnId();
+ else
+ affinityColExists0 = false;
+ }
+ else
+ affKeyColId = KEY_COL;
- affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING);
+ if (affinityColExists0) {
+ affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING);
- assert affKeyCol != null;
+ assert affKeyCol != null;
+ }
}
+ affinityColExists = affinityColExists0;
+
// Indexes must be created in the end when everything is ready.
idxs = idxsFactory.createIndexes(this);
@@ -139,6 +158,13 @@ public class GridH2Table extends TableBase {
}
/**
+ * @return {@code True} if affinity key exists in this table.
+ */
+ public boolean affinityColumnExists() {
+ return affinityColExists;
+ }
+
+ /**
* @return {@code true} If this is a partitioned table.
*/
public boolean isPartitioned() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
index 5a86da9..6daa2e1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
@@ -17,27 +17,47 @@
package org.apache.ignite.internal.processors.cache;
-import org.apache.ignite.Ignite;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
/**
- * TODO IGNITE-1232.
+ *
*/
+@SuppressWarnings("unchecked")
public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final int NODES = 1;
+ private static final int NODES = 4;
+
+ /** */
+ private boolean client;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -47,12 +67,12 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
CacheKeyConfiguration keyCfg = new CacheKeyConfiguration();
- keyCfg.setTypeName(TestKey1.class.getName());
+ keyCfg.setTypeName(PersonKeyWithAffinity.class.getName());
keyCfg.setAffinityKeyFieldName("affKey");
cfg.setCacheKeyConfiguration(keyCfg);
- cfg.setMarshaller(null);
+ cfg.setClientMode(client);
return cfg;
}
@@ -61,7 +81,11 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- startGridsMultiThreaded(NODES);
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
}
/** {@inheritDoc} */
@@ -74,44 +98,204 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testAffinityKey() throws Exception {
- CacheConfiguration ccfg = new CacheConfiguration();
+ public void testAffinityKeyNotQueryField() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 1, true, false);
+
+ IgniteCache cache = ignite(0).createCache(ccfg);
+
+ try {
+ putData(cache, true);
+
+ for (int i = 0; i < NODES; i++) {
+ final IgniteCache cache0 = ignite(i).cache(ccfg.getName());
+
+ final String QRY = "select o.name, p.name " +
+ "from Organization o, Person p " +
+ "where p.orgId = o._key";
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(QRY);
+
+ cache0.query(qry).getAll();
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ SqlFieldsQuery qry = new SqlFieldsQuery(QRY);
+
+ qry.setDistributedJoins(true);
+
+ cache0.query(qry).getAll();
+
+ return null;
+ }
+ }, CacheException.class, null);
+
+ cache0.query(qry).getAll();
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery() throws Exception {
+ testJoinQuery(PARTITIONED, 0, false);
+
+ testJoinQuery(PARTITIONED, 1, false);
+
+ testJoinQuery(REPLICATED, 0, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQueryWithAffinityKey() throws Exception {
+ testJoinQuery(PARTITIONED, 0, true);
- QueryEntity qryEntity = new QueryEntity();
- qryEntity.setKeyType(TestKey1.class.getName());
- qryEntity.setValueType(Integer.class.getName());
+ testJoinQuery(PARTITIONED, 1, true);
- ccfg.setQueryEntities(F.asList(qryEntity));
+ testJoinQuery(REPLICATED, 0, true);
+ }
- Ignite ignite = ignite(0);
+ /**
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param affKey If {@code true} uses key with affinity key field.
+ */
+ public void testJoinQuery(CacheMode cacheMode, int backups, boolean affKey) {
+ CacheConfiguration ccfg = cacheConfiguration(cacheMode, backups, affKey, affKey);
- IgniteCache cache = ignite.createCache(ccfg);
+ IgniteCache cache = ignite(0).createCache(ccfg);
try {
+ Map<Integer, Integer> cnts = putData(cache, affKey);
+ for (int i = 0; i < NODES; i++)
+ checkJoin(ignite(i).cache(ccfg.getName()), cnts);
}
finally {
- ignite.destroyCache(ccfg.getName());
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param personCnt Count of persons per organization.
+ */
+ private void checkJoin(IgniteCache cache, Map<Integer, Integer> personCnt) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p.name " +
+ "from Organization o, Person p " +
+ "where p.orgId = o._key and o._key=?");
+
+ qry.setDistributedJoins(true);
+
+ for (int i = 0; i < personCnt.size(); i++) {
+ qry.setArgs(i);
+
+ List<List<Object>> res = cache.query(qry).getAll();
+
+ assertEquals((int)personCnt.get(i), res.size());
}
}
/**
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param affKey If {@code true} uses key with affinity key field.
+ * @param includeAffKey If {@code true} includes affinity key field in query fields.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(CacheMode cacheMode,
+ int backups,
+ boolean affKey,
+ boolean includeAffKey) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(cacheMode);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ QueryEntity person = new QueryEntity();
+ person.setKeyType(affKey ? PersonKeyWithAffinity.class.getName() : PersonKey.class.getName());
+ person.setValueType(Person.class.getName());
+ person.addQueryField("orgId", Integer.class.getName(), null);
+ person.addQueryField("id", Integer.class.getName(), null);
+ person.addQueryField("name", String.class.getName(), null);
+
+ if (includeAffKey)
+ person.addQueryField("affKey", Integer.class.getName(), null);
+
+ QueryEntity org = new QueryEntity();
+ org.setKeyType(Integer.class.getName());
+ org.setValueType(Organization.class.getName());
+ org.addQueryField("name", String.class.getName(), null);
+
+ ccfg.setQueryEntities(F.asList(person, org));
+
+ return ccfg;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param affKey If {@code true} uses key with affinity key field.
+ * @return Count of persons per organization.
+ */
+ private Map<Integer, Integer> putData(IgniteCache cache, boolean affKey) {
+ Map<Integer, Integer> personCnt = new HashMap<>();
+
+ final int ORG_CNT = 10;
+
+ for (int i = 0; i < ORG_CNT; i++)
+ cache.put(i, new Organization("org-" + i));
+
+ Set<Integer> ids = new HashSet<>();
+
+ for (int i = 0; i < ORG_CNT; i++) {
+ int cnt = ThreadLocalRandom.current().nextInt(100);
+
+ for (int j = 0; j < cnt; j++) {
+ int personId = ThreadLocalRandom.current().nextInt();
+
+ while (!ids.add(personId))
+ personId = ThreadLocalRandom.current().nextInt();
+
+ Object key = affKey ? new PersonKeyWithAffinity(personId) : new PersonKey(personId);
+
+ String name = "person-" + personId;
+
+ cache.put(key, new Person(i, name));
+
+ assertEquals(name, ((Person)cache.get(key)).name);
+ }
+
+ personCnt.put(i, cnt);
+ }
+
+ return personCnt;
+ }
+
+ /**
*
*/
- public static class TestKey1 {
+ public static class PersonKeyWithAffinity {
/** */
- private int key;
+ private int id;
/** */
private int affKey;
/**
- * @param key Key.
+ * @param id Key.
*/
- public TestKey1(int key) {
- this.key = key;
+ public PersonKeyWithAffinity(int id) {
+ this.id = id;
- affKey = key + 1;
+ affKey = id + 1;
}
/** {@inheritDoc} */
@@ -122,14 +306,85 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
if (o == null || getClass() != o.getClass())
return false;
- TestKey1 testKey1 = (TestKey1)o;
+ PersonKeyWithAffinity other = (PersonKeyWithAffinity)o;
- return key == testKey1.key;
+ return id == other.id;
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return key;
+ return id;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class PersonKey {
+ /** */
+ private int id;
+
+ /**
+ * @param id Key.
+ */
+ public PersonKey(int id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ PersonKey other = (PersonKey)o;
+
+ return id == other.id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ @QuerySqlField
+ int orgId;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ * @param orgId Organization ID.
+ * @param name Name.
+ */
+ public Person(int orgId, String name) {
+ this.orgId = orgId;
+ this.name = name;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Organization implements Serializable {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ * @param name Name.
+ */
+ public Organization(String name) {
+ this.name = name;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 6ed4004..685afc3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.IgniteBinaryWrappedObjectFiel
import org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheFieldsQueryNoDataSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheJoinQueryTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheLargeResultSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheNoClassQuerySelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryTest;
@@ -246,6 +247,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheP2pUnmarshallingQueryErrorTest.class);
suite.addTestSuite(IgniteCacheNoClassQuerySelfTest.class);
+ suite.addTestSuite(IgniteCacheJoinQueryTest.class);
+
return suite;
}
}